Repository: beam Updated Branches: refs/heads/master afe8b0ea1 -> 5cb7be78b
http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index aa6090d..65253f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -19,12 +19,12 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; -import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO; -import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2; -import static org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE; -import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP; -import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED; -import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP; +import static org.apache.beam.sdk.io.Compression.AUTO; +import static org.apache.beam.sdk.io.Compression.BZIP2; +import static org.apache.beam.sdk.io.Compression.DEFLATE; +import static org.apache.beam.sdk.io.Compression.GZIP; +import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED; +import static org.apache.beam.sdk.io.Compression.ZIP; import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -63,7 +63,6 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -121,7 +120,7 @@ public class TextIOReadTest { @Rule public ExpectedException expectedException = ExpectedException.none(); - private static File writeToFile(List<String> lines, String filename, CompressionType compression) + private static File writeToFile(List<String> lines, String filename, Compression compression) throws IOException { File file = tempFolder.resolve(filename).toFile(); OutputStream output = new FileOutputStream(file); @@ -153,19 +152,19 @@ public class TextIOReadTest { public static void setupClass() throws IOException { tempFolder = Files.createTempDirectory("TextIOTest"); // empty files - emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED); + emptyTxt = writeToFile(EMPTY, "empty.txt", UNCOMPRESSED); emptyGz = writeToFile(EMPTY, "empty.gz", GZIP); emptyBzip2 = writeToFile(EMPTY, "empty.bz2", BZIP2); emptyZip = writeToFile(EMPTY, "empty.zip", ZIP); emptyDeflate = writeToFile(EMPTY, "empty.deflate", DEFLATE); // tiny files - tinyTxt = writeToFile(TINY, "tiny.txt", CompressionType.UNCOMPRESSED); + tinyTxt = writeToFile(TINY, "tiny.txt", UNCOMPRESSED); tinyGz = writeToFile(TINY, "tiny.gz", GZIP); tinyBzip2 = writeToFile(TINY, "tiny.bz2", BZIP2); tinyZip = writeToFile(TINY, "tiny.zip", ZIP); tinyDeflate = writeToFile(TINY, "tiny.deflate", DEFLATE); // large files - largeTxt = writeToFile(LARGE, "large.txt", CompressionType.UNCOMPRESSED); + largeTxt = writeToFile(LARGE, "large.txt", UNCOMPRESSED); largeGz = writeToFile(LARGE, "large.gz", GZIP); largeBzip2 = writeToFile(LARGE, "large.bz2", BZIP2); largeZip = writeToFile(LARGE, "large.zip", ZIP); @@ -235,7 +234,7 @@ public class TextIOReadTest { @Test public void testReadDisplayData() { - TextIO.Read read = TextIO.read().from("foo.*").withCompressionType(BZIP2); + TextIO.Read read = TextIO.read().from("foo.*").withCompression(BZIP2); DisplayData displayData = DisplayData.from(read); @@ -274,11 +273,11 @@ public class TextIOReadTest { } @Test - public void testCompressionTypeIsSet() throws Exception { + public void testCompressionIsSet() throws Exception { TextIO.Read read = TextIO.read().from("/tmp/test"); - assertEquals(AUTO, read.getCompressionType()); - read = TextIO.read().from("/tmp/test").withCompressionType(GZIP); - assertEquals(GZIP, read.getCompressionType()); + assertEquals(AUTO, read.getCompression()); + read = TextIO.read().from("/tmp/test").withCompression(GZIP); + assertEquals(GZIP, read.getCompression()); } /** @@ -299,34 +298,34 @@ public class TextIOReadTest { * * <p>The transforms being verified are: * <ul> - * <li>TextIO.read().from(filename).withCompressionType(compressionType) - * <li>TextIO.read().from(filename).withCompressionType(compressionType) + * <li>TextIO.read().from(filename).withCompression(compressionType) + * <li>TextIO.read().from(filename).withCompression(compressionType) * .withHintMatchesManyFiles() - * <li>TextIO.readAll().withCompressionType(compressionType) + * <li>TextIO.readAll().withCompression(compressionType) * </ul> and */ private void assertReadingCompressedFileMatchesExpected( - File file, CompressionType compressionType, List<String> expected) { + File file, Compression compression, List<String> expected) { int thisUniquifier = ++uniquifier; - TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType); + TextIO.Read read = TextIO.read().from(file.getPath()).withCompression(compression); PAssert.that( - p.apply("Read_" + file + "_" + compressionType.toString() + "_" + thisUniquifier, read)) + p.apply("Read_" + file + "_" + compression.toString() + "_" + thisUniquifier, read)) .containsInAnyOrder(expected); PAssert.that( p.apply( - "Read_" + file + "_" + compressionType.toString() + "_many" + "_" + thisUniquifier, + "Read_" + file + "_" + compression.toString() + "_many" + "_" + thisUniquifier, read.withHintMatchesManyFiles())) .containsInAnyOrder(expected); TextIO.ReadAll readAll = - TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10); + TextIO.readAll().withCompression(compression).withDesiredBundleSizeBytes(10); PAssert.that( p.apply("Create_" + file + "_" + thisUniquifier, Create.of(file.getPath())) - .apply("Read_" + compressionType.toString() + "_" + thisUniquifier, readAll)) + .apply("Read_" + compression.toString() + "_" + thisUniquifier, readAll)) .containsInAnyOrder(expected); } @@ -357,7 +356,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testSmallCompressedGzipReadActuallyUncompressed() throws Exception { File smallGzNotCompressed = - writeToFile(TINY, "tiny_uncompressed.gz", CompressionType.UNCOMPRESSED); + writeToFile(TINY, "tiny_uncompressed.gz", UNCOMPRESSED); // Should work with GZIP compression set. assertReadingCompressedFileMatchesExpected(smallGzNotCompressed, GZIP, TINY); // Should also work with AUTO mode set. @@ -412,7 +411,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testTxtRead() throws Exception { // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes. - for (CompressionType type : new CompressionType[] {AUTO, UNCOMPRESSED}) { + for (Compression type : new Compression[] {AUTO, UNCOMPRESSED}) { assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY); assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE); @@ -424,7 +423,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testGzipCompressedRead() throws Exception { // Files with the right extensions should work in AUTO and GZIP modes. - for (CompressionType type : new CompressionType[] {AUTO, GZIP}) { + for (Compression type : new Compression[] {AUTO, GZIP}) { assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY); assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE); @@ -443,7 +442,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testBzip2CompressedRead() throws Exception { // Files with the right extensions should work in AUTO and BZIP2 modes. - for (CompressionType type : new CompressionType[] {AUTO, BZIP2}) { + for (Compression type : new Compression[] {AUTO, BZIP2}) { assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY); assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE); @@ -462,7 +461,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testZipCompressedRead() throws Exception { // Files with the right extensions should work in AUTO and ZIP modes. - for (CompressionType type : new CompressionType[] {AUTO, ZIP}) { + for (Compression type : new Compression[] {AUTO, ZIP}) { assertReadingCompressedFileMatchesExpected(emptyZip, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyZip, type, TINY); assertReadingCompressedFileMatchesExpected(largeZip, type, LARGE); @@ -481,7 +480,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testDeflateCompressedRead() throws Exception { // Files with the right extensions should work in AUTO and ZIP modes. - for (CompressionType type : new CompressionType[] {AUTO, DEFLATE}) { + for (Compression type : new Compression[] {AUTO, DEFLATE}) { assertReadingCompressedFileMatchesExpected(emptyDeflate, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyDeflate, type, TINY); assertReadingCompressedFileMatchesExpected(largeDeflate, type, LARGE); @@ -504,7 +503,7 @@ public class TextIOReadTest { @Category(NeedsRunner.class) public void testZipCompressedReadWithNoEntries() throws Exception { String filename = createZipFile(new ArrayList<String>(), "empty zip file"); - assertReadingCompressedFileMatchesExpected(new File(filename), CompressionType.ZIP, EMPTY); + assertReadingCompressedFileMatchesExpected(new File(filename), ZIP, EMPTY); p.run(); } @@ -522,7 +521,7 @@ public class TextIOReadTest { List<String> expected = new ArrayList<>(); String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2); - assertReadingCompressedFileMatchesExpected(new File(filename), CompressionType.ZIP, expected); + assertReadingCompressedFileMatchesExpected(new File(filename), ZIP, expected); p.run(); } @@ -543,7 +542,7 @@ public class TextIOReadTest { new String[] {"dog"}); assertReadingCompressedFileMatchesExpected( - new File(filename), CompressionType.ZIP, Arrays.asList("cat", "dog")); + new File(filename), ZIP, Arrays.asList("cat", "dog")); p.run(); } @@ -836,7 +835,7 @@ public class TextIOReadTest { assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize)); FileBasedSource<String> source = - TextIO.read().from(largeTxt.getPath()).withCompressionType(GZIP).getSource(); + TextIO.read().from(largeTxt.getPath()).withCompression(GZIP).getSource(); List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); // Exactly 1 split, even though splittable text file, since using GZIP mode. @@ -853,7 +852,7 @@ public class TextIOReadTest { assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize)); FileBasedSource<String> source = - TextIO.read().from(largeGz.getPath()).withCompressionType(GZIP).getSource(); + TextIO.read().from(largeGz.getPath()).withCompression(GZIP).getSource(); List<? extends FileBasedSource<String>> splits = source.split(desiredBundleSize, options); // Exactly 1 split using .gz extension and using GZIP mode. @@ -873,7 +872,7 @@ public class TextIOReadTest { Create.of( tempFolder.resolve("readAllTiny*").toString(), tempFolder.resolve("readAllLarge*").toString())) - .apply(TextIO.readAll().withCompressionType(AUTO)); + .apply(TextIO.readAll().withCompression(AUTO)); PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); p.run(); } @@ -888,7 +887,7 @@ public class TextIOReadTest { TextIO.read() .from(basePath.resolve("*").toString()) // Make sure that compression type propagates into readAll() - .withCompressionType(ZIP) + .withCompression(ZIP) .watchForNewFiles( Duration.millis(100), afterTimeSinceNewOutput(Duration.standardSeconds(3)))); @@ -901,17 +900,17 @@ public class TextIOReadTest { writeToFile( Arrays.asList("a.1", "a.2"), basePath.resolve("fileA").toString(), - CompressionType.ZIP); + ZIP); Thread.sleep(300); writeToFile( Arrays.asList("b.1", "b.2"), basePath.resolve("fileB").toString(), - CompressionType.ZIP); + ZIP); Thread.sleep(300); writeToFile( Arrays.asList("c.1", "c.2"), basePath.resolve("fileC").toString(), - CompressionType.ZIP); + ZIP); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 5e0d685..e0f7b39 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -48,7 +48,6 @@ import java.util.regex.Pattern; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; @@ -352,7 +351,7 @@ public class WriteFilesTest { .withShardTemplate("-SS-of-NN"))); SimpleSink<Void> sink = new SimpleSink<Void>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { + getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED) { @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("foo", "bar")); @@ -453,7 +452,7 @@ public class WriteFilesTest { TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory()); SimpleSink<Integer> sink = new SimpleSink<>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED); + getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED); // Flag to validate that the pipeline options are passed to the Sink. WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); @@ -508,7 +507,7 @@ public class WriteFilesTest { .withShardTemplate("-SS-of-NN"))); SimpleSink<Void> sink = new SimpleSink<Void>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { + getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED) { @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("foo", "bar")); @@ -533,7 +532,7 @@ public class WriteFilesTest { .withShardTemplate("-SS-of-NN"))); SimpleSink<Void> sink = new SimpleSink<Void>( - getBaseOutputDirectory(), dynamicDestinations, CompressionType.UNCOMPRESSED) { + getBaseOutputDirectory(), dynamicDestinations, Compression.UNCOMPRESSED) { @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("foo", "bar")); http://git-wip-us.apache.org/repos/asf/beam/blob/54489f0d/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 7255a94..1e41b8d 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -29,6 +29,7 @@ import javax.xml.bind.ValidationEventHandler; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CompressedSource; +import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.OffsetBasedSource; import org.apache.beam.sdk.io.fs.ResourceId; @@ -113,7 +114,7 @@ public class XmlIO { public static <T> Read<T> read() { return new AutoValue_XmlIO_Read.Builder<T>() .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE) - .setCompressionType(Read.CompressionType.AUTO) + .setCompression(Compression.AUTO) .setCharset("UTF-8") .build(); } @@ -247,7 +248,7 @@ public class XmlIO { @Nullable abstract Class<T> getRecordClass(); - abstract CompressionType getCompressionType(); + abstract Compression getCompression(); abstract long getMinBundleSize(); @@ -271,7 +272,7 @@ public class XmlIO { abstract Builder<T> setMinBundleSize(long minBundleSize); - abstract Builder<T> setCompressionType(CompressionType compressionType); + abstract Builder<T> setCompression(Compression compression); abstract Builder<T> setCharset(String charset); @@ -280,35 +281,36 @@ public class XmlIO { abstract Read<T> build(); } - /** Strategy for determining the compression type of XML files being read. */ + /** @deprecated Use {@link Compression} instead. */ + @Deprecated public enum CompressionType { - /** Automatically determine the compression type based on filename extension. */ - AUTO(""), - /** Uncompressed (i.e., may be split). */ - UNCOMPRESSED(""), - /** GZipped. */ - GZIP(".gz"), - /** BZipped. */ - BZIP2(".bz2"), - /** Zipped. */ - ZIP(".zip"), - /** Deflate compressed. */ - DEFLATE(".deflate"); - - private String filenameSuffix; - - CompressionType(String suffix) { - this.filenameSuffix = suffix; + /** @see Compression#AUTO */ + AUTO(Compression.AUTO), + + /** @see Compression#UNCOMPRESSED */ + UNCOMPRESSED(Compression.UNCOMPRESSED), + + /** @see Compression#GZIP */ + GZIP(Compression.GZIP), + + /** @see Compression#BZIP2 */ + BZIP2(Compression.BZIP2), + + /** @see Compression#ZIP */ + ZIP(Compression.ZIP), + + /** @see Compression#DEFLATE */ + DEFLATE(Compression.DEFLATE); + + private Compression canonical; + + CompressionType(Compression canonical) { + this.canonical = canonical; } - /** - * Determine if a given filename matches a compression type based on its extension. - * - * @param filename the filename to match - * @return true iff the filename ends with the compression type's known extension. - */ + /** @see Compression#matches */ public boolean matches(String filename) { - return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase()); + return canonical.matches(filename); } } @@ -355,15 +357,15 @@ public class XmlIO { return toBuilder().setMinBundleSize(minBundleSize).build(); } - /** - * Decompresses all input files using the specified compression type. - * - * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. In this - * mode, the compression type of the file is determined by its extension. Supports .gz, .bz2, - * .zip and .deflate compression. - */ + /** @deprecated use {@link #withCompression}. */ + @Deprecated public Read<T> withCompressionType(CompressionType compressionType) { - return toBuilder().setCompressionType(compressionType).build(); + return withCompression(compressionType.canonical); + } + + /** Decompresses all input files using the specified compression type. */ + public Read<T> withCompression(Compression compression) { + return toBuilder().setCompression(compression).build(); } /** @@ -417,27 +419,7 @@ public class XmlIO { @VisibleForTesting BoundedSource<T> createSource() { - XmlSource<T> source = new XmlSource<>(this); - switch (getCompressionType()) { - case UNCOMPRESSED: - return source; - case AUTO: - return CompressedSource.from(source); - case BZIP2: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.BZIP2); - case GZIP: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.GZIP); - case ZIP: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.ZIP); - case DEFLATE: - return CompressedSource.from(source) - .withDecompression(CompressedSource.CompressionMode.DEFLATE); - default: - throw new IllegalArgumentException("Unknown compression type: " + getCompressionType()); - } + return CompressedSource.from(new XmlSource<>(this)).withCompression(getCompression()); } @Override