Repository: beam Updated Branches: refs/heads/master d4db4fb2c -> e2ef2d020
[BEAM-1433] Remove coder from TextIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/adba4c66 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/adba4c66 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/adba4c66 Branch: refs/heads/master Commit: adba4c660ef54b98055d30ee1ad7cbf440420030 Parents: d4db4fb Author: Aviem Zur <aviem...@gmail.com> Authored: Tue Feb 7 22:56:17 2017 +0200 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Feb 13 08:14:06 2017 -0800 ---------------------------------------------------------------------- .../examples/complete/TopWikipediaSessions.java | 25 +- .../FlinkStreamingTransformTranslators.java | 18 +- .../spark/translation/TransformTranslator.java | 12 +- .../java/org/apache/beam/sdk/io/TextIO.java | 383 +++++++------------ .../java/org/apache/beam/sdk/io/TextIOTest.java | 164 +++----- 5 files changed, 240 insertions(+), 362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 7eb80b7..4c07ca4 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -18,9 +18,9 @@ package org.apache.beam.examples.complete; import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -29,15 +29,18 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableComparator; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.CalendarWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -136,6 +139,17 @@ public class TopWikipediaSessions { } } + static class ParseTableRowJson extends SimpleFunction<String, TableRow> { + @Override + public TableRow apply(String input) { + try { + return Transport.getJsonFactory().fromString(input, TableRow.class); + } catch (IOException e) { + throw new RuntimeException("Failed parsing table row json", e); + } + } + } + static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> { private final double samplingThreshold; @@ -193,11 +207,10 @@ public class TopWikipediaSessions { double samplingThreshold = 0.1; - p.apply(TextIO.Read - .from(options.getInput()) - .withCoder(TableRowJsonCoder.of())) - .apply(new ComputeTopSessions(samplingThreshold)) - .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); + p.apply(TextIO.Read.from(options.getInput())) + .apply(MapElements.via(new ParseTableRowJson())) + .apply(new ComputeTopSessions(samplingThreshold)) + .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); p.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 1195c82..b9b5059 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -144,19 +144,18 @@ public class FlinkStreamingTransformTranslators { // Transformation Implementations // -------------------------------------------------------------------------------------------- - private static class TextIOWriteBoundStreamingTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - TextIO.Write.Bound<T>> { + private static class TextIOWriteBoundStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> { private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); @Override public void translateNode( - TextIO.Write.Bound<T> transform, + TextIO.Write.Bound transform, FlinkStreamingTranslationContext context) { PValue input = context.getInput(transform); - DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input); + DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input); String filenamePrefix = transform.getFilenamePrefix(); String filenameSuffix = transform.getFilenameSuffix(); @@ -176,10 +175,13 @@ public class FlinkStreamingTransformTranslators { shardNameTemplate); DataStream<String> dataSink = inputDataStream - .flatMap(new FlatMapFunction<WindowedValue<T>, String>() { + .flatMap(new FlatMapFunction<WindowedValue<String>, String>() { @Override - public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception { - out.collect(value.getValue().toString()); + public void flatMap( + WindowedValue<String> value, + Collector<String> out) + throws Exception { + out.collect(value.getValue()); } }); DataStreamSink<String> output = http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index fa5ae95..f0e339a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -293,10 +293,10 @@ public final class TransformTranslator { } - private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() { - return new TransformEvaluator<TextIO.Read.Bound<T>>() { + private static <T> TransformEvaluator<TextIO.Read.Bound> readText() { + return new TransformEvaluator<TextIO.Read.Bound>() { @Override - public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) { + public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) { String pattern = transform.getFilepattern(); JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern) .map(WindowingHelpers.<String>windowFunction()); @@ -305,10 +305,10 @@ public final class TransformTranslator { }; } - private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() { - return new TransformEvaluator<TextIO.Write.Bound<T>>() { + private static <T> TransformEvaluator<TextIO.Write.Bound> writeText() { + return new TransformEvaluator<TextIO.Write.Bound>() { @Override - public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) { + public void evaluate(TextIO.Write.Bound transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaPairRDD<T, Void> last = ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD() http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 16b871e..726411c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -26,7 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@ -39,7 +39,6 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; @@ -65,12 +64,11 @@ import org.apache.beam.sdk.values.PDone; * filename or filename pattern of the form * {@code "gs://<bucket>/<filepath>"}). * - * <p>By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, - * each corresponding to one line of an input UTF-8 text file. To convert directly from the raw - * bytes (split into lines delimited by '\n', '\r', or '\r\n') to another object of type {@code T}, - * supply a {@code Coder<T>} using {@link TextIO.Read#withCoder(Coder)}. + * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, + * each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', + * '\r', or '\r\n'). * - * <p>See the following examples: + * <p>Example: * * <pre>{@code * Pipeline p = ...; @@ -78,12 +76,6 @@ import org.apache.beam.sdk.values.PDone; * // A simple Read of a local file (only runs locally): * PCollection<String> lines = * p.apply(TextIO.Read.from("/local/path/to/file.txt")); - * - * // A fully-specified Read from a GCS file: - * PCollection<Integer> numbers = - * p.apply("ReadNumbers", TextIO.Read - * .from("gs://my_bucket/path/to/numbers-*.txt") - * .withCoder(TextualIntegerCoder.of())); * }</pre> * * <p>To write a {@link PCollection} to one or more text files, use @@ -91,8 +83,7 @@ import org.apache.beam.sdk.values.PDone; * the path of the file to write to (e.g., a local filename or sharded * filename pattern if running locally, or a Google Cloud Storage * filename or sharded filename pattern of the form - * {@code "gs://<bucket>/<filepath>"}). You can use {@link TextIO.Write#withCoder(Coder)} - * to specify the {@link Coder} to use to encode the Java values into text lines. + * {@code "gs://<bucket>/<filepath>"}). * * <p>Any existing files with the same names as generated output files * will be overwritten. @@ -103,19 +94,10 @@ import org.apache.beam.sdk.values.PDone; * PCollection<String> lines = ...; * lines.apply(TextIO.Write.to("/path/to/file.txt")); * - * // A fully-specified Write to a sharded GCS file: - * PCollection<Integer> numbers = ...; - * numbers.apply("WriteNumbers", TextIO.Write - * .to("gs://my_bucket/path/to/numbers") - * .withSuffix(".txt") - * .withCoder(TextualIntegerCoder.of())); - * * // Same as above, only with Gzip compression: - * PCollection<Integer> numbers = ...; - * numbers.apply("WriteNumbers", TextIO.Write - * .to("gs://my_bucket/path/to/numbers") + * PCollection<String> lines = ...; + * lines.apply(TextIO.Write.to("/path/to/file.txt")); * .withSuffix(".txt") - * .withCoder(TextualIntegerCoder.of()) * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); * }</pre> */ @@ -126,9 +108,7 @@ public class TextIO { /** * A {@link PTransform} that reads from a text file (or multiple text * files matching a pattern) and returns a {@link PCollection} containing - * the decoding of each of the lines of the text file(s). The - * default decoding just returns each line as a {@link String}, but you may call - * {@link #withCoder(Coder)} to change the return type. + * the decoding of each of the lines of the text file(s) as a {@link String}. */ public static class Read { @@ -140,30 +120,15 @@ public class TextIO { * service). Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported. */ - public static Bound<String> from(String filepattern) { - return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); + public static Bound from(String filepattern) { + return new Bound().from(filepattern); } /** * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public static Bound<String> from(ValueProvider<String> filepattern) { - return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); - } - - /** - * Returns a transform for reading text files that uses the given - * {@code Coder<T>} to decode each of the lines of the file into a - * value of type {@code T}. - * - * <p>By default, uses {@link StringUtf8Coder}, which just - * returns the text lines as Java strings. - * - * @param <T> the type of the decoded elements, and the elements - * of the resulting PCollection - */ - public static <T> Bound<T> withCoder(Coder<T> coder) { - return new Bound<>(coder); + public static Bound from(ValueProvider<String> filepattern) { + return new Bound().from(filepattern); } /** @@ -174,8 +139,8 @@ public class TextIO { * exist at the pipeline creation time, but is expected to be * available at execution time. */ - public static Bound<String> withoutValidation() { - return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation(); + public static Bound withoutValidation() { + return new Bound().withoutValidation(); } /** @@ -187,8 +152,8 @@ public class TextIO { * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are * uncompressed). */ - public static Bound<String> withCompressionType(TextIO.CompressionType compressionType) { - return new Bound<>(DEFAULT_TEXT_CODER).withCompressionType(compressionType); + public static Bound withCompressionType(TextIO.CompressionType compressionType) { + return new Bound().withCompressionType(compressionType); } // TODO: strippingNewlines, etc. @@ -196,33 +161,27 @@ public class TextIO { /** * A {@link PTransform} that reads from one or more text files and returns a bounded * {@link PCollection} containing one element for each line of the input files. - * - * @param <T> the type of each of the elements of the resulting - * {@link PCollection}. By default, each line is returned as a {@link String}, however you - * may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to produce a - * {@code PCollection<T>} instead. */ - public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { + public static class Bound extends PTransform<PBegin, PCollection<String>> { /** The filepattern to read from. */ @Nullable private final ValueProvider<String> filepattern; - /** The Coder to use to decode each line. */ - private final Coder<T> coder; - /** An option to indicate if input validation is desired. Default is true. */ private final boolean validate; /** Option to indicate the input source's compression type. Default is AUTO. */ private final TextIO.CompressionType compressionType; - Bound(Coder<T> coder) { - this(null, null, coder, true, TextIO.CompressionType.AUTO); + private Bound() { + this(null, null, true, TextIO.CompressionType.AUTO); } - private Bound(@Nullable String name, @Nullable ValueProvider<String> filepattern, - Coder<T> coder, boolean validate, TextIO.CompressionType compressionType) { + private Bound( + @Nullable String name, + @Nullable ValueProvider<String> filepattern, + boolean validate, + TextIO.CompressionType compressionType) { super(name); - this.coder = coder; this.filepattern = filepattern; this.validate = validate; this.compressionType = compressionType; @@ -236,32 +195,18 @@ public class TextIO { * <p>Does not modify this object. */ - public Bound<T> from(String filepattern) { + public Bound from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); - return new Bound<>(name, StaticValueProvider.of(filepattern), coder, validate, + return new Bound(name, StaticValueProvider.of(filepattern), validate, compressionType); } /** * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public Bound<T> from(ValueProvider<String> filepattern) { + public Bound from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); - return new Bound<>(name, filepattern, coder, validate, compressionType); - } - - /** - * Returns a new transform for reading from text files that's like this one but - * that uses the given {@link Coder Coder<X>} to decode each of the - * lines of the file into a value of type {@code X}. - * - * <p>Does not modify this object. - * - * @param <X> the type of the decoded elements, and the - * elements of the resulting PCollection - */ - public <X> Bound<X> withCoder(Coder<X> coder) { - return new Bound<>(name, filepattern, coder, validate, compressionType); + return new Bound(name, filepattern, validate, compressionType); } /** @@ -274,8 +219,8 @@ public class TextIO { * * <p>Does not modify this object. */ - public Bound<T> withoutValidation() { - return new Bound<>(name, filepattern, coder, false, compressionType); + public Bound withoutValidation() { + return new Bound(name, filepattern, false, compressionType); } /** @@ -287,12 +232,12 @@ public class TextIO { * * <p>Does not modify this object. */ - public Bound<T> withCompressionType(TextIO.CompressionType compressionType) { - return new Bound<>(name, filepattern, coder, validate, compressionType); + public Bound withCompressionType(TextIO.CompressionType compressionType) { + return new Bound(name, filepattern, validate, compressionType); } @Override - public PCollection<T> expand(PBegin input) { + public PCollection<String> expand(PBegin input) { if (filepattern == null) { throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform"); } @@ -310,31 +255,31 @@ public class TextIO { } } - final Bounded<T> read = org.apache.beam.sdk.io.Read.from(getSource()); - PCollection<T> pcol = input.getPipeline().apply("Read", read); + final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource()); + PCollection<String> pcol = input.getPipeline().apply("Read", read); // Honor the default output coder that would have been used by this PTransform. pcol.setCoder(getDefaultOutputCoder()); return pcol; } // Helper to create a source specific to the requested compression type. - protected FileBasedSource<T> getSource() { + protected FileBasedSource<String> getSource() { switch (compressionType) { case UNCOMPRESSED: - return new TextSource<T>(filepattern, coder); + return new TextSource(filepattern); case AUTO: - return CompressedSource.from(new TextSource<T>(filepattern, coder)); + return CompressedSource.from(new TextSource(filepattern)); case BZIP2: return - CompressedSource.from(new TextSource<T>(filepattern, coder)) + CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.BZIP2); case GZIP: return - CompressedSource.from(new TextSource<T>(filepattern, coder)) + CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.GZIP); case ZIP: return - CompressedSource.from(new TextSource<T>(filepattern, coder)) + CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.ZIP); default: throw new IllegalArgumentException("Unknown compression type: " + compressionType); @@ -357,8 +302,8 @@ public class TextIO { } @Override - protected Coder<T> getDefaultOutputCoder() { - return coder; + protected Coder<String> getDefaultOutputCoder() { + return DEFAULT_TEXT_CODER; } public String getFilepattern() { @@ -399,23 +344,23 @@ public class TextIO { * a shard identifier (see {@link Bound#withNumShards(int)}, and end * in a common extension, if given by {@link Bound#withSuffix(String)}. */ - public static Bound<String> to(String prefix) { - return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); + public static Bound to(String prefix) { + return new Bound().to(prefix); } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ - public static Bound<String> to(ValueProvider<String> prefix) { - return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); + public static Bound to(ValueProvider<String> prefix) { + return new Bound().to(prefix); } /** * Returns a transform for writing to text files that appends the specified suffix * to the created files. */ - public static Bound<String> withSuffix(String nameExtension) { - return new Bound<>(DEFAULT_TEXT_CODER).withSuffix(nameExtension); + public static Bound withSuffix(String nameExtension) { + return new Bound().withSuffix(nameExtension); } /** @@ -428,8 +373,8 @@ public class TextIO { * @param numShards the number of shards to use, or 0 to let the system * decide. */ - public static Bound<String> withNumShards(int numShards) { - return new Bound<>(DEFAULT_TEXT_CODER).withNumShards(numShards); + public static Bound withNumShards(int numShards) { + return new Bound().withNumShards(numShards); } /** @@ -438,30 +383,16 @@ public class TextIO { * * <p>See {@link ShardNameTemplate} for a description of shard templates. */ - public static Bound<String> withShardNameTemplate(String shardTemplate) { - return new Bound<>(DEFAULT_TEXT_CODER).withShardNameTemplate(shardTemplate); + public static Bound withShardNameTemplate(String shardTemplate) { + return new Bound().withShardNameTemplate(shardTemplate); } /** * Returns a transform for writing to text files that forces a single file as * output. */ - public static Bound<String> withoutSharding() { - return new Bound<>(DEFAULT_TEXT_CODER).withoutSharding(); - } - - /** - * Returns a transform for writing to text files that uses the given - * {@link Coder} to encode each of the elements of the input - * {@link PCollection} into an output text line. - * - * <p>By default, uses {@link StringUtf8Coder}, which writes input - * Java strings directly as output lines. - * - * @param <T> the type of the elements of the input {@link PCollection} - */ - public static <T> Bound<T> withCoder(Coder<T> coder) { - return new Bound<>(coder); + public static Bound withoutSharding() { + return new Bound().withoutSharding(); } /** @@ -472,8 +403,8 @@ public class TextIO { * not exist at the pipeline creation time, but is expected to be available * at execution time. */ - public static Bound<String> withoutValidation() { - return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation(); + public static Bound withoutValidation() { + return new Bound().withoutValidation(); } /** @@ -484,8 +415,8 @@ public class TextIO { * * @param header the string to be added as file header */ - public static Bound<String> withHeader(@Nullable String header) { - return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header); + public static Bound withHeader(@Nullable String header) { + return new Bound().withHeader(header); } /** @@ -496,8 +427,8 @@ public class TextIO { * * @param footer the string to be added as file footer */ - public static Bound<String> withFooter(@Nullable String footer) { - return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer); + public static Bound withFooter(@Nullable String footer) { + return new Bound().withFooter(footer); } /** @@ -509,10 +440,9 @@ public class TextIO { * * @param writableByteChannelFactory the factory to be used during output */ - public static Bound<String> withWritableByteChannelFactory( + public static Bound withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { - return new Bound<>(DEFAULT_TEXT_CODER) - .withWritableByteChannelFactory(writableByteChannelFactory); + return new Bound().withWritableByteChannelFactory(writableByteChannelFactory); } // TODO: appendingNewlines, etc. @@ -521,10 +451,8 @@ public class TextIO { * A PTransform that writes a bounded PCollection to a text file (or * multiple text files matching a sharding pattern), with each * PCollection element being encoded into its own line. - * - * @param <T> the type of the elements of the input PCollection */ - public static class Bound<T> extends PTransform<PCollection<T>, PDone> { + public static class Bound extends PTransform<PCollection<String>, PDone> { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The prefix of each file written, combined with suffix and shardTemplate. */ @@ -538,9 +466,6 @@ public class TextIO { /** An optional footer to add to each file. */ @Nullable private final String footer; - /** The Coder to use to decode each line. */ - private final Coder<T> coder; - /** Requested number of shards. 0 for automatic. */ private final int numShards; @@ -556,19 +481,18 @@ public class TextIO { */ private final WritableByteChannelFactory writableByteChannelFactory; - Bound(Coder<T> coder) { - this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true, + private Bound() { + this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true, FileBasedSink.CompressionType.UNCOMPRESSED); } private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix, - @Nullable String header, @Nullable String footer, Coder<T> coder, int numShards, + @Nullable String header, @Nullable String footer, int numShards, String shardTemplate, boolean validate, WritableByteChannelFactory writableByteChannelFactory) { super(name); this.header = header; this.footer = footer; - this.coder = coder; this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; this.numShards = numShards; @@ -586,18 +510,18 @@ public class TextIO { * * <p>Does not modify this object. */ - public Bound<T> to(String filenamePrefix) { + public Bound to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Bound<>(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, - header, footer, coder, numShards, shardTemplate, validate, + return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, + header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ - public Bound<T> to(ValueProvider<String> filenamePrefix) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound to(ValueProvider<String> filenamePrefix) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -609,9 +533,9 @@ public class TextIO { * * @see ShardNameTemplate */ - public Bound<T> withSuffix(String nameExtension) { + public Bound withSuffix(String nameExtension) { validateOutputComponent(nameExtension); - return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards, + return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -629,9 +553,9 @@ public class TextIO { * decide. * @see ShardNameTemplate */ - public Bound<T> withNumShards(int numShards) { + public Bound withNumShards(int numShards) { checkArgument(numShards >= 0); - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -643,8 +567,8 @@ public class TextIO { * * @see ShardNameTemplate */ - public Bound<T> withShardNameTemplate(String shardTemplate) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withShardNameTemplate(String shardTemplate) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -661,25 +585,12 @@ public class TextIO { * * <p>Does not modify this object. */ - public Bound<T> withoutSharding() { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "", + public Bound withoutSharding() { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "", validate, writableByteChannelFactory); } /** - * Returns a transform for writing to text files that's like this one - * but that uses the given {@link Coder Coder<X>} to encode each of - * the elements of the input {@link PCollection PCollection<X>} into an - * output text line. Does not modify this object. - * - * @param <X> the type of the elements of the input {@link PCollection} - */ - public <X> Bound<X> withCoder(Coder<X> coder) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, validate, writableByteChannelFactory); - } - - /** * Returns a transform for writing to text files that's like this one but * that has GCS output path validation on pipeline creation disabled. * @@ -689,8 +600,8 @@ public class TextIO { * * <p>Does not modify this object. */ - public Bound<T> withoutValidation() { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withoutValidation() { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, false, writableByteChannelFactory); } @@ -704,8 +615,8 @@ public class TextIO { * * @param header the string to be added as file header */ - public Bound<T> withHeader(@Nullable String header) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withHeader(@Nullable String header) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -719,8 +630,8 @@ public class TextIO { * * @param footer the string to be added as file footer */ - public Bound<T> withFooter(@Nullable String footer) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withFooter(@Nullable String footer) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -735,22 +646,22 @@ public class TextIO { * * @param writableByteChannelFactory the factory to be used during output */ - public Bound<T> withWritableByteChannelFactory( + public Bound withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @Override - public PDone expand(PCollection<T> input) { + public PDone expand(PCollection<String> input) { if (filenamePrefix == null) { throw new IllegalStateException( "need to set the filename prefix of a TextIO.Write transform"); } - org.apache.beam.sdk.io.Write.Bound<T> write = + org.apache.beam.sdk.io.Write.Bound<String> write = org.apache.beam.sdk.io.Write.to( - new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate, - coder, writableByteChannelFactory)); + new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate, + writableByteChannelFactory)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -812,10 +723,6 @@ public class TextIO { return filenameSuffix; } - public Coder<T> getCoder() { - return coder; - } - @Nullable public String getHeader() { return header; @@ -901,40 +808,38 @@ public class TextIO { * representing the beginning of the first record to be decoded. */ @VisibleForTesting - static class TextSource<T> extends FileBasedSource<T> { + static class TextSource extends FileBasedSource<String> { /** The Coder to use to decode each line. */ - private final Coder<T> coder; - @VisibleForTesting - TextSource(String fileSpec, Coder<T> coder) { + TextSource(String fileSpec) { super(fileSpec, 1L); - this.coder = coder; } @VisibleForTesting - TextSource(ValueProvider<String> fileSpec, Coder<T> coder) { + TextSource(ValueProvider<String> fileSpec) { super(fileSpec, 1L); - this.coder = coder; } - private TextSource(String fileName, long start, long end, Coder<T> coder) { + private TextSource(String fileName, long start, long end) { super(fileName, 1L, start, end); - this.coder = coder; } @Override - protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) { - return new TextSource<>(fileName, start, end, coder); + protected FileBasedSource<String> createForSubrangeOfFile( + String fileName, + long start, + long end) { + return new TextSource(fileName, start, end); } @Override - protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) { - return new TextBasedReader<>(this); + protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) { + return new TextBasedReader(this); } @Override - public Coder<T> getDefaultOutputCoder() { - return coder; + public Coder<String> getDefaultOutputCoder() { + return DEFAULT_TEXT_CODER; } /** @@ -944,9 +849,8 @@ public class TextIO { * <p>See {@link TextSource} for further details. */ @VisibleForTesting - static class TextBasedReader<T> extends FileBasedReader<T> { + static class TextBasedReader extends FileBasedReader<String> { private static final int READ_BUFFER_SIZE = 8192; - private final Coder<T> coder; private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteString buffer; private int startOfSeparatorInBuffer; @@ -955,12 +859,11 @@ public class TextIO { private volatile long startOfNextRecord; private volatile boolean eof; private volatile boolean elementIsPresent; - private T currentValue; + private String currentValue; private ReadableByteChannel inChannel; - private TextBasedReader(TextSource<T> source) { + private TextBasedReader(TextSource source) { super(source); - coder = source.coder; buffer = ByteString.EMPTY; } @@ -981,7 +884,7 @@ public class TextIO { } @Override - public T getCurrent() throws NoSuchElementException { + public String getCurrent() throws NoSuchElementException { if (!elementIsPresent) { throw new NoSuchElementException(); } @@ -1078,7 +981,7 @@ public class TextIO { */ private void decodeCurrentElement() throws IOException { ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer); - currentValue = coder.decode(dataToDecode.newInput(), Context.OUTER); + currentValue = dataToDecode.toStringUtf8(); elementIsPresent = true; buffer = buffer.substring(endOfSeparatorInBuffer); } @@ -1107,48 +1010,46 @@ public class TextIO { * Each record (including the last) is terminated. */ @VisibleForTesting - static class TextSink<T> extends FileBasedSink<T> { - private final Coder<T> coder; + static class TextSink extends FileBasedSink<String> { @Nullable private final String header; @Nullable private final String footer; @VisibleForTesting TextSink( - ValueProvider<String> baseOutputFilename, String extension, - @Nullable String header, @Nullable String footer, - String fileNameTemplate, Coder<T> coder, + ValueProvider<String> baseOutputFilename, + String extension, + @Nullable String header, + @Nullable String footer, + String fileNameTemplate, WritableByteChannelFactory writableByteChannelFactory) { super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory); - this.coder = coder; this.header = header; this.footer = footer; } @Override - public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new TextWriteOperation<>(this, coder, header, footer); + public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation( + PipelineOptions options) { + return new TextWriteOperation(this, header, footer); } /** * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation * FileBasedWriteOperation} for text files. */ - private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> { - private final Coder<T> coder; + private static class TextWriteOperation extends FileBasedWriteOperation<String> { @Nullable private final String header; @Nullable private final String footer; - private TextWriteOperation(TextSink<T> sink, Coder<T> coder, - @Nullable String header, @Nullable String footer) { + private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) { super(sink); - this.coder = coder; this.header = header; this.footer = footer; } @Override - public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new TextWriter<>(this, coder, header, footer); + public FileBasedWriter createWriter(PipelineOptions options) throws Exception { + return new TextWriter(this, header, footer); } } @@ -1156,35 +1057,42 @@ public class TextIO { * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} * for text files. */ - private static class TextWriter<T> extends FileBasedWriter<T> { - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final Coder<T> coder; + private static class TextWriter extends FileBasedWriter<String> { + private static final String NEWLINE = "\n"; @Nullable private final String header; @Nullable private final String footer; - private OutputStream out; + private OutputStreamWriter out; - public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, - @Nullable String header, @Nullable String footer) { + public TextWriter( + FileBasedWriteOperation<String> writeOperation, + @Nullable String header, + @Nullable String footer) { super(writeOperation); this.header = header; this.footer = footer; this.mimeType = MimeTypes.TEXT; - this.coder = coder; } /** - * Writes {@code value} followed by a newline if {@code value} is not null. + * Writes {@code value} followed by a newline character if {@code value} is not null. */ private void writeIfNotNull(@Nullable String value) throws IOException { if (value != null) { - out.write(value.getBytes(StandardCharsets.UTF_8)); - out.write(NEWLINE); + writeLine(value); } } + /** + * Writes {@code value} followed by newline character. + */ + private void writeLine(String value) throws IOException { + out.write(value); + out.write(NEWLINE); + } + @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { - out = Channels.newOutputStream(channel); + out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8); } @Override @@ -1195,12 +1103,13 @@ public class TextIO { @Override protected void writeFooter() throws Exception { writeIfNotNull(footer); + // Flush here because there is currently no other natural place to do this. [BEAM-1465] + out.flush(); } @Override - public void write(T value) throws Exception { - coder.encode(value, out, Context.OUTER); - out.write(NEWLINE); + public void write(String value) throws Exception { + writeLine(value); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index b8b28eb..6304603 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.io; -import static org.apache.beam.sdk.TestUtils.INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; -import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO; import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2; @@ -79,8 +77,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TextualIntegerCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; @@ -215,28 +211,21 @@ public class TextIOTest { }); } - private <T> void runTestRead(T[] expected, Coder<T> coder) throws Exception { + private <T> void runTestRead(String[] expected) throws Exception { File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); String filename = tmpFile.getPath(); try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { - for (T elem : expected) { - byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem); + for (String elem : expected) { + byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); String line = new String(encodedElem); writer.println(line); } } - TextIO.Read.Bound<T> read; - if (coder.equals(StringUtf8Coder.of())) { - TextIO.Read.Bound<String> readStrings = TextIO.Read.from(filename); - // T==String - read = (TextIO.Read.Bound<T>) readStrings; - } else { - read = TextIO.Read.from(filename).withCoder(coder); - } + TextIO.Read.Bound read = TextIO.Read.from(filename); - PCollection<T> output = p.apply(read); + PCollection<String> output = p.apply(read); PAssert.that(output).containsInAnyOrder(expected); p.run(); @@ -245,31 +234,13 @@ public class TextIOTest { @Test @Category(NeedsRunner.class) public void testReadStrings() throws Exception { - runTestRead(LINES_ARRAY, StringUtf8Coder.of()); + runTestRead(LINES_ARRAY); } @Test @Category(NeedsRunner.class) public void testReadEmptyStrings() throws Exception { - runTestRead(NO_LINES_ARRAY, StringUtf8Coder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testReadInts() throws Exception { - runTestRead(INTS_ARRAY, TextualIntegerCoder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testReadEmptyInts() throws Exception { - runTestRead(NO_INTS_ARRAY, TextualIntegerCoder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testReadNulls() throws Exception { - runTestRead(new Void[] {null, null, null}, VoidCoder.of()); + runTestRead(NO_LINES_ARRAY); } @Test @@ -286,7 +257,7 @@ public class TextIOTest { @Test public void testReadDisplayData() { - TextIO.Read.Bound<?> read = TextIO.Read + TextIO.Read.Bound read = TextIO.Read .from("foo.*") .withCompressionType(BZIP2) .withoutValidation(); @@ -303,7 +274,7 @@ public class TextIOTest { public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - TextIO.Read.Bound<String> read = TextIO.Read + TextIO.Read.Bound read = TextIO.Read .from("foobar") .withoutValidation(); @@ -312,36 +283,32 @@ public class TextIOTest { displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } - private <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception { - runTestWrite(elems, null, null, coder, 1); + private void runTestWrite(String[] elems) throws Exception { + runTestWrite(elems, null, null, 1); } - private <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception { - runTestWrite(elems, null, null, coder, numShards); + private void runTestWrite(String[] elems, int numShards) throws Exception { + runTestWrite(elems, null, null, numShards); } - private <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) + private void runTestWrite(String[] elems, String header, String footer) throws Exception { - runTestWrite(elems, header, footer, coder, 1); + runTestWrite(elems, header, footer, 1); } - private <T> void runTestWrite( - T[] elems, String header, String footer, Coder<T> coder, int numShards) throws Exception { + private void runTestWrite( + String[] elems, String header, String footer, int numShards) throws Exception { String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); String baseFilename = baseDir.resolve(outputName).toString(); - PCollection<T> input = p.apply(Create.of(Arrays.asList(elems)).withCoder(coder)); + PCollection<String> input = + p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of())); - TextIO.Write.Bound<T> write; - if (coder.equals(StringUtf8Coder.of())) { - TextIO.Write.Bound<String> writeStrings = TextIO.Write.to(baseFilename); - // T==String - write = (TextIO.Write.Bound<T>) writeStrings; - } else { - write = TextIO.Write.to(baseFilename).withCoder(coder); - } - write = write.withHeader(header).withFooter(footer); + TextIO.Write.Bound write = + TextIO.Write.to(baseFilename) + .withHeader(header) + .withFooter(footer); if (numShards == 1) { write = write.withoutSharding(); @@ -353,15 +320,14 @@ public class TextIOTest { p.run(); - assertOutputFiles(elems, header, footer, coder, numShards, baseDir, outputName, + assertOutputFiles(elems, header, footer, numShards, baseDir, outputName, write.getShardNameTemplate()); } - public static <T> void assertOutputFiles( - T[] elems, + public static void assertOutputFiles( + String[] elems, final String header, final String footer, - Coder<T> coder, int numShards, Path rootLocation, String outputName, @@ -400,8 +366,8 @@ public class TextIOTest { } List<String> expectedElements = new ArrayList<>(elems.length); - for (T elem : elems) { - byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem); + for (String elem : elems) { + byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); String line = new String(encodedElem); expectedElements.add(line); } @@ -453,55 +419,43 @@ public class TextIOTest { @Test @Category(NeedsRunner.class) public void testWriteStrings() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of()); + runTestWrite(LINES_ARRAY); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStringsNoSharding() throws Exception { - runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of(), 0); + runTestWrite(NO_LINES_ARRAY, 0); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStrings() throws Exception { - runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testWriteInts() throws Exception { - runTestWrite(INTS_ARRAY, TextualIntegerCoder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testWriteEmptyInts() throws Exception { - runTestWrite(NO_INTS_ARRAY, TextualIntegerCoder.of()); + runTestWrite(NO_LINES_ARRAY); } @Test @Category(NeedsRunner.class) public void testShardedWrite() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5); + runTestWrite(LINES_ARRAY, 5); } @Test @Category(NeedsRunner.class) public void testWriteWithHeader() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, null); + runTestWrite(LINES_ARRAY, MY_HEADER, null); } @Test @Category(NeedsRunner.class) public void testWriteWithFooter() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), null, MY_FOOTER); + runTestWrite(LINES_ARRAY, null, MY_FOOTER); } @Test @Category(NeedsRunner.class) public void testWriteWithHeaderAndFooter() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, MY_FOOTER); + runTestWrite(LINES_ARRAY, MY_HEADER, MY_FOOTER); } @Test @@ -515,7 +469,7 @@ public class TextIOTest { final WritableByteChannelFactory writableByteChannelFactory = new DrunkWritableByteChannelFactory(); - TextIO.Write.Bound<String> write = TextIO.Write.to(baseDir.resolve(outputName).toString()) + TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString()) .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK")); @@ -526,16 +480,16 @@ public class TextIOTest { final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 2); for (String elem : LINES2_ARRAY) { - drunkElems.add(elem + elem); - drunkElems.add(""); + drunkElems.add(elem); + drunkElems.add(elem); } - assertOutputFiles(drunkElems.toArray(new String[0]), null, null, coder, 1, baseDir, + assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir, outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate()); } @Test public void testWriteDisplayData() { - TextIO.Write.Bound<?> write = TextIO.Write + TextIO.Write.Bound write = TextIO.Write .to("foo") .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") @@ -558,7 +512,7 @@ public class TextIOTest { @Test public void testWriteDisplayDataValidateThenHeader() { - TextIO.Write.Bound<?> write = TextIO.Write + TextIO.Write.Bound write = TextIO.Write .to("foo") .withHeader("myHeader"); @@ -570,7 +524,7 @@ public class TextIOTest { @Test public void testWriteDisplayDataValidateThenFooter() { - TextIO.Write.Bound<?> write = TextIO.Write + TextIO.Write.Bound write = TextIO.Write .to("foo") .withFooter("myFooter"); @@ -590,7 +544,7 @@ public class TextIOTest { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - TextIO.Write.Bound<?> write = TextIO.Write.to(outputPath); + TextIO.Write.Bound write = TextIO.Write.to(outputPath); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("TextIO.Write should include the file prefix in its primitive display data", @@ -649,21 +603,21 @@ public class TextIOTest { @Test public void testReadWithoutValidationFlag() throws Exception { - TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/foo*/baz"); + TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/foo*/baz"); assertTrue(read.needsValidation()); assertFalse(read.withoutValidation().needsValidation()); } @Test public void testWriteWithoutValidationFlag() throws Exception { - TextIO.Write.Bound<String> write = TextIO.Write.to("gs://bucket/foo/baz"); + TextIO.Write.Bound write = TextIO.Write.to("gs://bucket/foo/baz"); assertTrue(write.needsValidation()); assertFalse(write.withoutValidation().needsValidation()); } @Test public void testCompressionTypeIsSet() throws Exception { - TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/test"); + TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/test"); assertEquals(AUTO, read.getCompressionType()); read = TextIO.Read.from("gs://bucket/test").withCompressionType(GZIP); assertEquals(GZIP, read.getCompressionType()); @@ -688,7 +642,7 @@ public class TextIOTest { private void assertReadingCompressedFileMatchesExpected( File file, CompressionType compressionType, String[] expected) { - TextIO.Read.Bound<String> read = + TextIO.Read.Bound read = TextIO.Read.from(file.getPath()).withCompressionType(compressionType); PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read); @@ -1064,74 +1018,74 @@ public class TextIOTest { } private void runTestReadWithData(byte[] data, List<String> expectedResults) throws Exception { - TextSource<String> source = prepareSource(data); + TextSource source = prepareSource(data); List<String> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); assertThat(actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); } @Test public void testSplittingSourceWithEmptyLines() throws Exception { - TextSource<String> source = prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithLineFeedDelimiter() throws Exception { - TextSource<String> source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnDelimiter() throws Exception { - TextSource<String> source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() throws Exception { - TextSource<String> source = prepareSource( + TextSource source = prepareSource( "asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithMixedDelimiters() throws Exception { - TextSource<String> source = prepareSource( + TextSource source = prepareSource( "asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource<String> source = prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource<String> source = prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource<String> source = prepareSource( + TextSource source = prepareSource( "asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() throws Exception { - TextSource<String> source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } - private TextSource<String> prepareSource(byte[] data) throws IOException { + private TextSource prepareSource(byte[] data) throws IOException { Path path = Files.createTempFile(tempFolder, "tempfile", "ext"); Files.write(path, data); - return new TextSource<>(path.toString(), StringUtf8Coder.of()); + return new TextSource(path.toString()); } @Test