Repository: incubator-beam Updated Branches: refs/heads/master 49f944430 -> b7b68e6fb
[BEAM-55] TextIO & FileBasedSink: Add support for compressed output [BEAM-55] Add customizable file-based output support through DecoratedFileSink and concrete Gzip file-based output support through WriterOutputGzipDecoratorFactory [BEAM-55] Add example pipeline usage and a few eclipse-related entries to gitignore [BEAM-55] Move logic from DecoratedFileSink into FileBasedSink and TextIO to allow for direct support of customizable file-based, including built-in Gzip and Bzip2 support, per request by @dhalperi [BEAM-697] TextIO.Write.Bound.withHeader() and withFooter() now pass current value of validate to returned Bound instance instead of literal 'false' [BEAM-55] Simplified overlycomplex API, removed example pipeline, cleaned up comment formatting, added small usage example to main TextIO javadoc, per request by @dhalperi [BEAM-55] Revert erroneous formatting, remove old comments, fix javadoc, inline superclass only used in one unit test [BEAM-55] Add writableByteChannelFactory to DisplayData, move fix for BEAM-697 into its own PR Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa589ee4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa589ee4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa589ee4 Branch: refs/heads/master Commit: fa589ee4e16b91a06e2b78d870b1fa70ba5834e8 Parents: 49f9444 Author: Jeffrey Scott Keone Payne <jeffkpa...@gmail.com> Authored: Thu Sep 29 12:23:44 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Oct 14 10:14:48 2016 -0700 ---------------------------------------------------------------------- .gitignore | 5 + .../org/apache/beam/sdk/io/FileBasedSink.java | 138 ++++++++++++++++++- .../java/org/apache/beam/sdk/io/TextIO.java | 91 +++++++++--- .../sdk/io/DrunkWritableByteChannelFactory.java | 80 +++++++++++ .../apache/beam/sdk/io/FileBasedSinkTest.java | 108 +++++++++++++++ .../java/org/apache/beam/sdk/io/TextIOTest.java | 59 ++++++-- 6 files changed, 447 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 5390dd0..d6cffec 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,8 @@ bin/ .project .factorypath .checkstyle +.fbExcludeFilterFile +.apt_generated/ .settings/ # The build process generates the dependency-reduced POM, but it shouldn't be @@ -27,6 +29,9 @@ dependency-reduced-pom.xml # produced by a text editor. *~ +# Ignore MacOSX files. +.DS_Store + # NOTE: if you modify this file, you probably need to modify the file set that # is an input to 'maven-assembly-plugin' that generates source distribution. # This is typically in files named 'src.xml' throughout this repository. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index f571d50..7e95c5b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -20,11 +20,14 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; import com.google.common.collect.Ordering; + import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -34,6 +37,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.zip.GZIPOutputStream; + +import javax.annotation.Nullable; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; @@ -45,6 +52,7 @@ import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +77,64 @@ import org.slf4j.LoggerFactory; */ public abstract class FileBasedSink<T> extends Sink<T> { /** + * Directly supported file output compression types. + */ + public static enum CompressionType implements WritableByteChannelFactory { + /** + * No compression, or any other transformation, will be used. + */ + UNCOMPRESSED("", MimeTypes.TEXT) { + @Override + public WritableByteChannel create(WritableByteChannel channel) throws IOException { + return channel; + } + }, + /** + * Provides GZip output transformation. + */ + GZIP(".gz", MimeTypes.BINARY) { + @Override + public WritableByteChannel create(WritableByteChannel channel) throws IOException { + return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true)); + } + }, + /** + * Provides BZip2 output transformation. + */ + BZIP2(".bz2", MimeTypes.BINARY) { + @Override + public WritableByteChannel create(WritableByteChannel channel) throws IOException { + return Channels + .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel))); + } + }; + + private String filenameSuffix; + private String mimeType; + + private CompressionType(String suffix, String mimeType) { + this.filenameSuffix = suffix; + this.mimeType = mimeType; + } + + @Override + public String getFilenameSuffix() { + return filenameSuffix; + } + + @Override + public String getMimeType() { + return mimeType; + } + } + + /** + * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the + * underlying channel. The default is to not compress the output using {@link #UNCOMPRESSED}. + */ + protected final WritableByteChannelFactory writableByteChannelFactory; + + /** * Base filename for final output files. */ protected final String baseOutputFilename; @@ -85,21 +151,48 @@ public abstract class FileBasedSink<T> extends Sink<T> { protected final String fileNamingTemplate; /** - * Construct a FileBasedSink with the given base output filename and extension. + * Construct a FileBasedSink with the given base output filename and extension. A + * {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED} will be used. */ public FileBasedSink(String baseOutputFilename, String extension) { this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX); } /** + * Construct a FileBasedSink with the given base output filename, extension, and + * {@link WritableByteChannelFactory}. + */ + public FileBasedSink(String baseOutputFilename, String extension, + WritableByteChannelFactory writableByteChannelFactory) { + this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory); + } + + /** * Construct a FileBasedSink with the given base output filename, extension, and file naming - * template. + * template. A {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED} + * will be used. * * <p>See {@link ShardNameTemplate} for a description of file naming templates. */ public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) { + this(baseOutputFilename, extension, fileNamingTemplate, CompressionType.UNCOMPRESSED); + } + + /** + * Construct a FileBasedSink with the given base output filename, extension, file naming template, + * and {@link WritableByteChannelFactory}. + * + * <p>See {@link ShardNameTemplate} for a description of file naming templates. + */ + public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate, + WritableByteChannelFactory writableByteChannelFactory) { + this.writableByteChannelFactory = writableByteChannelFactory; this.baseOutputFilename = baseOutputFilename; - this.extension = extension; + if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) { + this.extension = extension + getFileExtension(writableByteChannelFactory.getFilenameSuffix()); + } else { + this.extension = extension; + } this.fileNamingTemplate = fileNamingTemplate; } @@ -492,7 +585,10 @@ public abstract class FileBasedSink<T> extends Sink<T> { filename = FileBasedWriteOperation.buildTemporaryFilename( getWriteOperation().baseTemporaryFilename, uId); LOG.debug("Opening {}.", filename); - channel = IOChannelUtils.create(filename, mimeType); + final WritableByteChannelFactory factory = + getWriteOperation().getSink().writableByteChannelFactory; + mimeType = factory.getMimeType(); + channel = factory.create(IOChannelUtils.create(filename, mimeType)); try { prepareWrite(channel); LOG.debug("Writing header to {}.", filename); @@ -514,7 +610,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { } /** - * Closes the channel and return the bundle result. + * Closes the channel and returns the bundle result. */ @Override public final FileResult close() throws Exception { @@ -674,4 +770,36 @@ public abstract class FileBasedSink<T> extends Sink<T> { } } } + + /** + * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink} + * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that + * would normally be written directly to the {@link WritableByteChannel} passed into + * {@link WritableByteChannelFactory#create(WritableByteChannel)}. + * + * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when + * building {@link DisplayData}. + */ + public interface WritableByteChannelFactory extends Serializable { + /** + * @param channel the {@link WritableByteChannel} to wrap + * @return the {@link WritableByteChannel} to be used during output + * @throws IOException + */ + public WritableByteChannel create(WritableByteChannel channel) throws IOException; + + /** + * @return the MIME type that should be used for the files that will hold the output data + * @see MimeTypes + * @see <a href= + * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a> + */ + public String getMimeType(); + + /** + * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP} + */ + @Nullable + public String getFilenameSuffix(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 9d91dff..3ae2a0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -17,12 +17,13 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -33,11 +34,14 @@ import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import java.util.regex.Pattern; + import javax.annotation.Nullable; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; @@ -102,6 +106,14 @@ import org.apache.beam.sdk.values.PDone; * .to("gs://my_bucket/path/to/numbers") * .withSuffix(".txt") * .withCoder(TextualIntegerCoder.of())); + * + * // Same as above, only with Gzip compression: + * PCollection<Integer> numbers = ...; + * numbers.apply("WriteNumbers", TextIO.Write + * .to("gs://my_bucket/path/to/numbers") + * .withSuffix(".txt") + * .withCoder(TextualIntegerCoder.of()) + * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); * }</pre> */ public class TextIO { @@ -458,6 +470,21 @@ public class TextIO { return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer); } + /** + * Returns a transform for writing to text files like this one but that has the given + * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The + * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. + * + * <p>A {@code null} value will reset the value to the default value mentioned above. + * + * @param writableByteChannelFactory the factory to be used during output + */ + public static Bound<String> withWritableByteChannelFactory( + WritableByteChannelFactory writableByteChannelFactory) { + return new Bound<>(DEFAULT_TEXT_CODER) + .withWritableByteChannelFactory(writableByteChannelFactory); + } + // TODO: appendingNewlines, etc. /** @@ -493,13 +520,21 @@ public class TextIO { /** An option to indicate if output validation is desired. Default is true. */ private final boolean validate; + /** + * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is + * {@link FileBasedSink.CompressionType#UNCOMPRESSED}. + */ + private final WritableByteChannelFactory writableByteChannelFactory; + Bound(Coder<T> coder) { - this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true); + this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true, + FileBasedSink.CompressionType.UNCOMPRESSED); } private Bound(String name, String filenamePrefix, String filenameSuffix, @Nullable String header, @Nullable String footer, Coder<T> coder, int numShards, - String shardTemplate, boolean validate) { + String shardTemplate, boolean validate, + WritableByteChannelFactory writableByteChannelFactory) { super(name); this.header = header; this.footer = footer; @@ -509,6 +544,8 @@ public class TextIO { this.numShards = numShards; this.shardTemplate = shardTemplate; this.validate = validate; + this.writableByteChannelFactory = + firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED); } /** @@ -522,7 +559,7 @@ public class TextIO { public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, validate); + shardTemplate, validate, writableByteChannelFactory); } /** @@ -536,7 +573,7 @@ public class TextIO { public Bound<T> withSuffix(String nameExtension) { validateOutputComponent(nameExtension); return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards, - shardTemplate, validate); + shardTemplate, validate, writableByteChannelFactory); } /** @@ -556,7 +593,7 @@ public class TextIO { public Bound<T> withNumShards(int numShards) { checkArgument(numShards >= 0); return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, validate); + shardTemplate, validate, writableByteChannelFactory); } /** @@ -569,7 +606,7 @@ public class TextIO { */ public Bound<T> withShardNameTemplate(String shardTemplate) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, validate); + shardTemplate, validate, writableByteChannelFactory); } /** @@ -587,7 +624,7 @@ public class TextIO { */ public Bound<T> withoutSharding() { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "", - validate); + validate, writableByteChannelFactory); } /** @@ -600,7 +637,7 @@ public class TextIO { */ public <X> Bound<X> withCoder(Coder<X> coder) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, validate); + shardTemplate, validate, writableByteChannelFactory); } /** @@ -615,7 +652,7 @@ public class TextIO { */ public Bound<T> withoutValidation() { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, false); + shardTemplate, false, writableByteChannelFactory); } /** @@ -630,7 +667,7 @@ public class TextIO { */ public Bound<T> withHeader(@Nullable String header) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, false); + shardTemplate, false, writableByteChannelFactory); } /** @@ -645,7 +682,24 @@ public class TextIO { */ public Bound<T> withFooter(@Nullable String footer) { return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, false); + shardTemplate, false, writableByteChannelFactory); + } + + /** + * Returns a transform for writing to text files like this one but that has the given + * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. + * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}. + * + * <p>A {@code null} value will reset the value to the default value mentioned above. + * + * <p>Does not modify this object. + * + * @param writableByteChannelFactory the factory to be used during output + */ + public Bound<T> withWritableByteChannelFactory( + WritableByteChannelFactory writableByteChannelFactory) { + return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + shardTemplate, validate, writableByteChannelFactory); } @Override @@ -654,11 +708,10 @@ public class TextIO { throw new IllegalStateException( "need to set the filename prefix of a TextIO.Write transform"); } - org.apache.beam.sdk.io.Write.Bound<T> write = org.apache.beam.sdk.io.Write.to( new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate, - coder)); + coder, writableByteChannelFactory)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -684,7 +737,10 @@ public class TextIO { .addIfNotNull(DisplayData.item("fileHeader", header) .withLabel("File Header")) .addIfNotNull(DisplayData.item("fileFooter", footer) - .withLabel("File Footer")); + .withLabel("File Footer")) + .add(DisplayData + .item("writableByteChannelFactory", writableByteChannelFactory.toString()) + .withLabel("Compression/Transformation Type")); } /** @@ -1018,8 +1074,9 @@ public class TextIO { TextSink( String baseOutputFilename, String extension, @Nullable String header, @Nullable String footer, - String fileNameTemplate, Coder<T> coder) { - super(baseOutputFilename, extension, fileNameTemplate); + String fileNameTemplate, Coder<T> coder, + WritableByteChannelFactory writableByteChannelFactory) { + super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory); this.coder = coder; this.header = header; this.footer = footer; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java new file mode 100644 index 0000000..79f0996 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; +import org.apache.beam.sdk.util.MimeTypes; + +/** + * {@link WritableByteChannelFactory} implementation useful for testing that creates a + * {@link WritableByteChannel} that writes everything twice. + */ +public class DrunkWritableByteChannelFactory implements WritableByteChannelFactory { + @Override + public WritableByteChannel create(WritableByteChannel channel) throws IOException { + return new DrunkWritableByteChannel(channel); + } + + @Override + public String getMimeType() { + return MimeTypes.TEXT; + } + + @Override + public String getFilenameSuffix() { + return ".drunk"; + } + + @Override + public String toString() { + return "DRUNK"; + } + + /** + * WritableByteChannel that writes everything twice. + */ + private static class DrunkWritableByteChannel implements WritableByteChannel { + protected final WritableByteChannel channel; + + public DrunkWritableByteChannel(final WritableByteChannel channel) { + this.channel = channel; + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + final int w1 = channel.write(src); + src.rewind(); + final int w2 = channel.write(src); + return w1 + w2; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 0fdb11f..66bb661 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -26,20 +26,32 @@ import static org.junit.Assert.fail; import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.zip.GZIPInputStream; + +import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention; +import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.io.FileBasedSink.FileResult; +import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -419,6 +431,97 @@ public class FileBasedSinkTest { } /** + * {@link CompressionType#BZIP2} correctly writes Gzipped data. + */ + @Test + public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException { + final File file = + writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123"); + // Read Bzip2ed data back in using Apache commons API (de facto standard). + assertReadValues(new BufferedReader(new InputStreamReader( + new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), + "abc", "123"); + } + + /** + * {@link CompressionType#GZIP} correctly writes Gzipped data. + */ + @Test + public void testCompressionTypeGZIP() throws FileNotFoundException, IOException { + final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123"); + // Read Gzipped data back in using standard API. + assertReadValues(new BufferedReader(new InputStreamReader( + new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", + "123"); + } + + /** + * {@link CompressionType#GZIP} correctly writes Gzipped data. + */ + @Test + public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException { + final File file = + writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123"); + // Read uncompressed data back in using standard API. + assertReadValues(new BufferedReader(new InputStreamReader( + new FileInputStream(file), StandardCharsets.UTF_8.name())), "abc", + "123"); + } + + private void assertReadValues(final BufferedReader br, String... values) throws IOException { + try (final BufferedReader _br = br) { + for (String value : values) { + assertEquals(String.format("Line should read '%s'", value), value, _br.readLine()); + } + } + } + + private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory, + String... values) + throws IOException, FileNotFoundException { + final File file = tmpFolder.newFile("test.gz"); + final WritableByteChannel channel = + factory.create(Channels.newChannel(new FileOutputStream(file))); + for (String value : values) { + channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8))); + } + channel.close(); + return file; + } + + /** + * {@link FileBasedWriter} writes to the {@link WritableByteChannel} provided by + * {@link DrunkWritableByteChannelFactory}. + */ + @Test + public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { + final String testUid = "testId"; + final String expectedFilename = + getBaseOutputFilename() + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid; + final FileBasedWriter<String> writer = + new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory()) + .createWriteOperation(null).createWriter(null); + + final List<String> expected = new ArrayList<>(); + expected.add("header"); + expected.add("header"); + expected.add("a"); + expected.add("a"); + expected.add("b"); + expected.add("b"); + expected.add("footer"); + expected.add("footer"); + + writer.open(testUid); + writer.write("a"); + writer.write("b"); + final FileResult result = writer.close(); + + assertEquals(expectedFilename, result.getFilename()); + assertFileContains(expected, expectedFilename); + } + + /** * A simple FileBasedSink that writes String values as lines with header and footer lines. */ private static final class SimpleSink extends FileBasedSink<String> { @@ -426,6 +529,11 @@ public class FileBasedSinkTest { super(baseOutputFilename, extension); } + public SimpleSink(String baseOutputFilename, String extension, + WritableByteChannelFactory writableByteChannelFactory) { + super(baseOutputFilename, extension, writableByteChannelFactory); + } + public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) { super(baseOutputFilename, extension, fileNamingTemplate); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index fdfb652..2131ece 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.TestUtils.INTS_ARRAY; +import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; @@ -47,6 +48,7 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; + import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; @@ -70,13 +72,16 @@ import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; + import javax.annotation.Nullable; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.TextIO.TextSource; import org.apache.beam.sdk.options.GcsOptions; @@ -170,7 +175,7 @@ public class TextIOTest { @BeforeClass public static void setupClass() throws IOException { IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions()); - tempFolder = Files.createTempDirectory("TextIOTest"); + tempFolder = Files.createTempDirectory("TextIOTest"); // empty files emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED); emptyGz = writeToFile(EMPTY, "empty.gz", GZIP); @@ -261,7 +266,7 @@ public class TextIOTest { @Test @Category(NeedsRunner.class) public void testReadNulls() throws Exception { - runTestRead(new Void[]{null, null, null}, VoidCoder.of()); + runTestRead(new Void[] {null, null, null}, VoidCoder.of()); } @Test @@ -342,6 +347,7 @@ public class TextIOTest { } else if (numShards > 0) { write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX); } + input.apply(write); p.run(); @@ -413,7 +419,7 @@ public class TextIOTest { } private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header, - final String footer) { + final String footer) { return new Function<List<String>, List<String>>() { @Nullable @Override @@ -498,6 +504,36 @@ public class TextIOTest { } @Test + @Category(NeedsRunner.class) + public void testWriteWithWritableByteChannelFactory() throws Exception { + Coder<String> coder = StringUtf8Coder.of(); + String outputName = "file.txt"; + Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); + Pipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder)); + + final WritableByteChannelFactory writableByteChannelFactory = + new DrunkWritableByteChannelFactory(); + TextIO.Write.Bound<String> write = TextIO.Write.to(baseDir.resolve(outputName).toString()) + .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory); + DisplayData displayData = DisplayData.from(write); + assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK")); + + input.apply(write); + + p.run(); + + final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 2); + for (String elem : LINES2_ARRAY) { + drunkElems.add(elem + elem); + drunkElems.add(""); + } + assertOutputFiles(drunkElems.toArray(new String[0]), null, null, coder, 1, baseDir, + outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate()); + } + + @Test public void testWriteDisplayData() { TextIO.Write.Bound<?> write = TextIO.Write .to("foo") @@ -517,6 +553,7 @@ public class TextIOTest { assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false)); + assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED")); } @Test @@ -638,9 +675,9 @@ public class TextIOTest { } /** - * Tests reading from a small, uncompressed file with .gz extension. - * This must work in AUTO or GZIP modes. This is needed because some network file systems / HTTP - * clients will transparently decompress gzipped content. + * Tests reading from a small, uncompressed file with .gz extension. This must work in AUTO or + * GZIP modes. This is needed because some network file systems / HTTP clients will transparently + * decompress gzipped content. */ @Test @Category(NeedsRunner.class) @@ -672,9 +709,7 @@ public class TextIOTest { * @return The zip filename. * @throws Exception In case of a failure during zip file creation. */ - private String createZipFile(List<String> expected, String filename, String[] - ... - fieldsEntries) + private String createZipFile(List<String> expected, String filename, String[]... fieldsEntries) throws Exception { File tmpFile = tempFolder.resolve(filename).toFile(); String tmpFileName = tmpFile.getPath(); @@ -703,7 +738,7 @@ public class TextIOTest { @Category(NeedsRunner.class) public void testTxtRead() throws Exception { // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes. - for (CompressionType type : new CompressionType[] { AUTO, UNCOMPRESSED }) { + for (CompressionType type : new CompressionType[]{AUTO, UNCOMPRESSED}) { assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY); assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE); @@ -714,7 +749,7 @@ public class TextIOTest { @Category(NeedsRunner.class) public void testGzipCompressedRead() throws Exception { // Files with the right extensions should work in AUTO and GZIP modes. - for (CompressionType type : new CompressionType[] { AUTO, GZIP }) { + for (CompressionType type : new CompressionType[]{AUTO, GZIP}) { assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY); assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE); @@ -732,7 +767,7 @@ public class TextIOTest { @Category(NeedsRunner.class) public void testBzip2CompressedRead() throws Exception { // Files with the right extensions should work in AUTO and BZIP2 modes. - for (CompressionType type : new CompressionType[] { AUTO, BZIP2 }) { + for (CompressionType type : new CompressionType[]{AUTO, BZIP2}) { assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY); assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY); assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE);