Repository: beam
Updated Branches:
  refs/heads/master d4db4fb2c -> e2ef2d020


[BEAM-1433] Remove coder from TextIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/adba4c66
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/adba4c66
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/adba4c66

Branch: refs/heads/master
Commit: adba4c660ef54b98055d30ee1ad7cbf440420030
Parents: d4db4fb
Author: Aviem Zur <aviem...@gmail.com>
Authored: Tue Feb 7 22:56:17 2017 +0200
Committer: Dan Halperin <dhalp...@google.com>
Committed: Mon Feb 13 08:14:06 2017 -0800

----------------------------------------------------------------------
 .../examples/complete/TopWikipediaSessions.java |  25 +-
 .../FlinkStreamingTransformTranslators.java     |  18 +-
 .../spark/translation/TransformTranslator.java  |  12 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 383 +++++++------------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 164 +++-----
 5 files changed, 240 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
 
b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 7eb80b7..4c07ca4 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -18,9 +18,9 @@
 package org.apache.beam.examples.complete;
 
 import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -29,15 +29,18 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableComparator;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -136,6 +139,17 @@ public class TopWikipediaSessions {
     }
   }
 
+  static class ParseTableRowJson extends SimpleFunction<String, TableRow> {
+    @Override
+    public TableRow apply(String input) {
+      try {
+        return Transport.getJsonFactory().fromString(input, TableRow.class);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed parsing table row json", e);
+      }
+    }
+  }
+
   static class ComputeTopSessions extends PTransform<PCollection<TableRow>, 
PCollection<String>> {
 
     private final double samplingThreshold;
@@ -193,11 +207,10 @@ public class TopWikipediaSessions {
 
     double samplingThreshold = 0.1;
 
-    p.apply(TextIO.Read
-        .from(options.getInput())
-        .withCoder(TableRowJsonCoder.of()))
-     .apply(new ComputeTopSessions(samplingThreshold))
-     .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
+    p.apply(TextIO.Read.from(options.getInput()))
+        .apply(MapElements.via(new ParseTableRowJson()))
+        .apply(new ComputeTopSessions(samplingThreshold))
+        .apply("Write", 
TextIO.Write.withoutSharding().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 1195c82..b9b5059 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -144,19 +144,18 @@ public class FlinkStreamingTransformTranslators {
   //  Transformation Implementations
   // 
--------------------------------------------------------------------------------------------
 
-  private static class TextIOWriteBoundStreamingTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-        TextIO.Write.Bound<T>> {
+  private static class TextIOWriteBoundStreamingTranslator
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
 
     private static final Logger LOG =
         LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
 
     @Override
     public void translateNode(
-        TextIO.Write.Bound<T> transform,
+        TextIO.Write.Bound transform,
         FlinkStreamingTranslationContext context) {
       PValue input = context.getInput(transform);
-      DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
+      DataStream<WindowedValue<String>> inputDataStream = 
context.getInputDataStream(input);
 
       String filenamePrefix = transform.getFilenamePrefix();
       String filenameSuffix = transform.getFilenameSuffix();
@@ -176,10 +175,13 @@ public class FlinkStreamingTransformTranslators {
           shardNameTemplate);
 
       DataStream<String> dataSink = inputDataStream
-          .flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
+          .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
             @Override
-            public void flatMap(WindowedValue<T> value, Collector<String> out) 
throws Exception {
-              out.collect(value.getValue().toString());
+            public void flatMap(
+                WindowedValue<String> value,
+                Collector<String> out)
+                throws Exception {
+              out.collect(value.getValue());
             }
           });
       DataStreamSink<String> output =

http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index fa5ae95..f0e339a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -293,10 +293,10 @@ public final class TransformTranslator {
   }
 
 
-  private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() {
-    return new TransformEvaluator<TextIO.Read.Bound<T>>() {
+  private static <T> TransformEvaluator<TextIO.Read.Bound> readText() {
+    return new TransformEvaluator<TextIO.Read.Bound>() {
       @Override
-      public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext 
context) {
+      public void evaluate(TextIO.Read.Bound transform, EvaluationContext 
context) {
         String pattern = transform.getFilepattern();
         JavaRDD<WindowedValue<String>> rdd = 
context.getSparkContext().textFile(pattern)
             .map(WindowingHelpers.<String>windowFunction());
@@ -305,10 +305,10 @@ public final class TransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() {
-    return new TransformEvaluator<TextIO.Write.Bound<T>>() {
+  private static <T> TransformEvaluator<TextIO.Write.Bound> writeText() {
+    return new TransformEvaluator<TextIO.Write.Bound>() {
       @Override
-      public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext 
context) {
+      public void evaluate(TextIO.Write.Bound transform, EvaluationContext 
context) {
         @SuppressWarnings("unchecked")
         JavaPairRDD<T, Void> last =
             ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()

http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 16b871e..726411c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -26,7 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.OutputStreamWriter;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -39,7 +39,6 @@ import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
@@ -65,12 +64,11 @@ import org.apache.beam.sdk.values.PDone;
  * filename or filename pattern of the form
  * {@code "gs://<bucket>/<filepath>"}).
  *
- * <p>By default, {@link TextIO.Read} returns a {@link PCollection} of {@link 
String Strings},
- * each corresponding to one line of an input UTF-8 text file. To convert 
directly from the raw
- * bytes (split into lines delimited by '\n', '\r', or '\r\n') to another 
object of type {@code T},
- * supply a {@code Coder<T>} using {@link TextIO.Read#withCoder(Coder)}.
+ * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String 
Strings},
+ * each corresponding to one line of an input UTF-8 text file (split into 
lines delimited by '\n',
+ * '\r', or '\r\n').
  *
- * <p>See the following examples:
+ * <p>Example:
  *
  * <pre>{@code
  * Pipeline p = ...;
@@ -78,12 +76,6 @@ import org.apache.beam.sdk.values.PDone;
  * // A simple Read of a local file (only runs locally):
  * PCollection<String> lines =
  *     p.apply(TextIO.Read.from("/local/path/to/file.txt"));
- *
- * // A fully-specified Read from a GCS file:
- * PCollection<Integer> numbers =
- *     p.apply("ReadNumbers", TextIO.Read
- *         .from("gs://my_bucket/path/to/numbers-*.txt")
- *         .withCoder(TextualIntegerCoder.of()));
  * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more text files, use
@@ -91,8 +83,7 @@ import org.apache.beam.sdk.values.PDone;
  * the path of the file to write to (e.g., a local filename or sharded
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You can use {@link 
TextIO.Write#withCoder(Coder)}
- * to specify the {@link Coder} to use to encode the Java values into text 
lines.
+ * {@code "gs://<bucket>/<filepath>"}).
  *
  * <p>Any existing files with the same names as generated output files
  * will be overwritten.
@@ -103,19 +94,10 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<String> lines = ...;
  * lines.apply(TextIO.Write.to("/path/to/file.txt"));
  *
- * // A fully-specified Write to a sharded GCS file:
- * PCollection<Integer> numbers = ...;
- * numbers.apply("WriteNumbers", TextIO.Write
- *      .to("gs://my_bucket/path/to/numbers")
- *      .withSuffix(".txt")
- *      .withCoder(TextualIntegerCoder.of()));
- *
  * // Same as above, only with Gzip compression:
- * PCollection<Integer> numbers = ...;
- * numbers.apply("WriteNumbers", TextIO.Write
- *      .to("gs://my_bucket/path/to/numbers")
+ * PCollection<String> lines = ...;
+ * lines.apply(TextIO.Write.to("/path/to/file.txt"));
  *      .withSuffix(".txt")
- *      .withCoder(TextualIntegerCoder.of())
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
  */
@@ -126,9 +108,7 @@ public class TextIO {
   /**
    * A {@link PTransform} that reads from a text file (or multiple text
    * files matching a pattern) and returns a {@link PCollection} containing
-   * the decoding of each of the lines of the text file(s). The
-   * default decoding just returns each line as a {@link String}, but you may 
call
-   * {@link #withCoder(Coder)} to change the return type.
+   * the decoding of each of the lines of the text file(s) as a {@link String}.
    */
   public static class Read {
 
@@ -140,30 +120,15 @@ public class TextIO {
      * service). Standard <a 
href="http://docs.oracle.com/javase/tutorial/essential/io/find.html";
      * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
      */
-    public static Bound<String> from(String filepattern) {
-      return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern);
+    public static Bound from(String filepattern) {
+      return new Bound().from(filepattern);
     }
 
     /**
      * Same as {@code from(filepattern)}, but accepting a {@link 
ValueProvider}.
      */
-    public static Bound<String> from(ValueProvider<String> filepattern) {
-      return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern);
-    }
-
-    /**
-     * Returns a transform for reading text files that uses the given
-     * {@code Coder<T>} to decode each of the lines of the file into a
-     * value of type {@code T}.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which just
-     * returns the text lines as Java strings.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting PCollection
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
+    public static Bound from(ValueProvider<String> filepattern) {
+      return new Bound().from(filepattern);
     }
 
     /**
@@ -174,8 +139,8 @@ public class TextIO {
      * exist at the pipeline creation time, but is expected to be
      * available at execution time.
      */
-    public static Bound<String> withoutValidation() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
+    public static Bound withoutValidation() {
+      return new Bound().withoutValidation();
     }
 
     /**
@@ -187,8 +152,8 @@ public class TextIO {
      * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other 
extensions are
      * uncompressed).
      */
-    public static Bound<String> withCompressionType(TextIO.CompressionType 
compressionType) {
-      return new 
Bound<>(DEFAULT_TEXT_CODER).withCompressionType(compressionType);
+    public static Bound withCompressionType(TextIO.CompressionType 
compressionType) {
+      return new Bound().withCompressionType(compressionType);
     }
 
     // TODO: strippingNewlines, etc.
@@ -196,33 +161,27 @@ public class TextIO {
     /**
      * A {@link PTransform} that reads from one or more text files and returns 
a bounded
      * {@link PCollection} containing one element for each line of the input 
files.
-     *
-     * @param <T> the type of each of the elements of the resulting
-     * {@link PCollection}. By default, each line is returned as a {@link 
String}, however you
-     * may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to 
produce a
-     * {@code PCollection<T>} instead.
      */
-    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
+    public static class Bound extends PTransform<PBegin, PCollection<String>> {
       /** The filepattern to read from. */
       @Nullable private final ValueProvider<String> filepattern;
 
-      /** The Coder to use to decode each line. */
-      private final Coder<T> coder;
-
       /** An option to indicate if input validation is desired. Default is 
true. */
       private final boolean validate;
 
       /** Option to indicate the input source's compression type. Default is 
AUTO. */
       private final TextIO.CompressionType compressionType;
 
-      Bound(Coder<T> coder) {
-        this(null, null, coder, true, TextIO.CompressionType.AUTO);
+      private Bound() {
+        this(null, null, true, TextIO.CompressionType.AUTO);
       }
 
-      private Bound(@Nullable String name, @Nullable ValueProvider<String> 
filepattern,
-          Coder<T> coder, boolean validate, TextIO.CompressionType 
compressionType) {
+      private Bound(
+          @Nullable String name,
+          @Nullable ValueProvider<String> filepattern,
+          boolean validate,
+          TextIO.CompressionType compressionType) {
         super(name);
-        this.coder = coder;
         this.filepattern = filepattern;
         this.validate = validate;
         this.compressionType = compressionType;
@@ -236,32 +195,18 @@ public class TextIO {
        * <p>Does not modify this object.
 
        */
-      public Bound<T> from(String filepattern) {
+      public Bound from(String filepattern) {
         checkNotNull(filepattern, "Filepattern cannot be empty.");
-        return new Bound<>(name, StaticValueProvider.of(filepattern), coder, 
validate,
+        return new Bound(name, StaticValueProvider.of(filepattern), validate,
                            compressionType);
       }
 
       /**
        * Same as {@code from(filepattern)}, but accepting a {@link 
ValueProvider}.
        */
-      public Bound<T> from(ValueProvider<String> filepattern) {
+      public Bound from(ValueProvider<String> filepattern) {
         checkNotNull(filepattern, "Filepattern cannot be empty.");
-        return new Bound<>(name, filepattern, coder, validate, 
compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this 
one but
-       * that uses the given {@link Coder Coder&lt;X&gt;} to decode each of the
-       * lines of the file into a value of type {@code X}.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements, and the
-       * elements of the resulting PCollection
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filepattern, coder, validate, 
compressionType);
+        return new Bound(name, filepattern, validate, compressionType);
       }
 
       /**
@@ -274,8 +219,8 @@ public class TextIO {
        *
        * <p>Does not modify this object.
        */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(name, filepattern, coder, false, compressionType);
+      public Bound withoutValidation() {
+        return new Bound(name, filepattern, false, compressionType);
       }
 
       /**
@@ -287,12 +232,12 @@ public class TextIO {
        *
        * <p>Does not modify this object.
        */
-      public Bound<T> withCompressionType(TextIO.CompressionType 
compressionType) {
-        return new Bound<>(name, filepattern, coder, validate, 
compressionType);
+      public Bound withCompressionType(TextIO.CompressionType compressionType) 
{
+        return new Bound(name, filepattern, validate, compressionType);
       }
 
       @Override
-      public PCollection<T> expand(PBegin input) {
+      public PCollection<String> expand(PBegin input) {
         if (filepattern == null) {
           throw new IllegalStateException("need to set the filepattern of a 
TextIO.Read transform");
         }
@@ -310,31 +255,31 @@ public class TextIO {
           }
         }
 
-        final Bounded<T> read = org.apache.beam.sdk.io.Read.from(getSource());
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
+        final Bounded<String> read = 
org.apache.beam.sdk.io.Read.from(getSource());
+        PCollection<String> pcol = input.getPipeline().apply("Read", read);
         // Honor the default output coder that would have been used by this 
PTransform.
         pcol.setCoder(getDefaultOutputCoder());
         return pcol;
       }
 
       // Helper to create a source specific to the requested compression type.
-      protected FileBasedSource<T> getSource() {
+      protected FileBasedSource<String> getSource() {
         switch (compressionType) {
           case UNCOMPRESSED:
-            return new TextSource<T>(filepattern, coder);
+            return new TextSource(filepattern);
           case AUTO:
-            return CompressedSource.from(new TextSource<T>(filepattern, 
coder));
+            return CompressedSource.from(new TextSource(filepattern));
           case BZIP2:
             return
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
+                CompressedSource.from(new TextSource(filepattern))
                     .withDecompression(CompressedSource.CompressionMode.BZIP2);
           case GZIP:
             return
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
+                CompressedSource.from(new TextSource(filepattern))
                     .withDecompression(CompressedSource.CompressionMode.GZIP);
           case ZIP:
             return
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
+                CompressedSource.from(new TextSource(filepattern))
                     .withDecompression(CompressedSource.CompressionMode.ZIP);
           default:
             throw new IllegalArgumentException("Unknown compression type: " + 
compressionType);
@@ -357,8 +302,8 @@ public class TextIO {
       }
 
       @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return coder;
+      protected Coder<String> getDefaultOutputCoder() {
+        return DEFAULT_TEXT_CODER;
       }
 
       public String getFilepattern() {
@@ -399,23 +344,23 @@ public class TextIO {
      * a shard identifier (see {@link Bound#withNumShards(int)}, and end
      * in a common extension, if given by {@link Bound#withSuffix(String)}.
      */
-    public static Bound<String> to(String prefix) {
-      return new Bound<>(DEFAULT_TEXT_CODER).to(prefix);
+    public static Bound to(String prefix) {
+      return new Bound().to(prefix);
     }
 
     /**
      * Like {@link #to(String)}, but with a {@link ValueProvider}.
      */
-    public static Bound<String> to(ValueProvider<String> prefix) {
-      return new Bound<>(DEFAULT_TEXT_CODER).to(prefix);
+    public static Bound to(ValueProvider<String> prefix) {
+      return new Bound().to(prefix);
     }
 
     /**
      * Returns a transform for writing to text files that appends the 
specified suffix
      * to the created files.
      */
-    public static Bound<String> withSuffix(String nameExtension) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withSuffix(nameExtension);
+    public static Bound withSuffix(String nameExtension) {
+      return new Bound().withSuffix(nameExtension);
     }
 
     /**
@@ -428,8 +373,8 @@ public class TextIO {
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
      */
-    public static Bound<String> withNumShards(int numShards) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withNumShards(numShards);
+    public static Bound withNumShards(int numShards) {
+      return new Bound().withNumShards(numShards);
     }
 
     /**
@@ -438,30 +383,16 @@ public class TextIO {
      *
      * <p>See {@link ShardNameTemplate} for a description of shard templates.
      */
-    public static Bound<String> withShardNameTemplate(String shardTemplate) {
-      return new 
Bound<>(DEFAULT_TEXT_CODER).withShardNameTemplate(shardTemplate);
+    public static Bound withShardNameTemplate(String shardTemplate) {
+      return new Bound().withShardNameTemplate(shardTemplate);
     }
 
     /**
      * Returns a transform for writing to text files that forces a single file 
as
      * output.
      */
-    public static Bound<String> withoutSharding() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutSharding();
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the given
-     * {@link Coder} to encode each of the elements of the input
-     * {@link PCollection} into an output text line.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which writes input
-     * Java strings directly as output lines.
-     *
-     * @param <T> the type of the elements of the input {@link PCollection}
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
+    public static Bound withoutSharding() {
+      return new Bound().withoutSharding();
     }
 
     /**
@@ -472,8 +403,8 @@ public class TextIO {
      * not exist at the pipeline creation time, but is expected to be available
      * at execution time.
      */
-    public static Bound<String> withoutValidation() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
+    public static Bound withoutValidation() {
+      return new Bound().withoutValidation();
     }
 
     /**
@@ -484,8 +415,8 @@ public class TextIO {
      *
      * @param header the string to be added as file header
      */
-    public static Bound<String> withHeader(@Nullable String header) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
+    public static Bound withHeader(@Nullable String header) {
+      return new Bound().withHeader(header);
     }
 
     /**
@@ -496,8 +427,8 @@ public class TextIO {
      *
      * @param footer the string to be added as file footer
      */
-    public static Bound<String> withFooter(@Nullable String footer) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
+    public static Bound withFooter(@Nullable String footer) {
+      return new Bound().withFooter(footer);
     }
 
     /**
@@ -509,10 +440,9 @@ public class TextIO {
      *
      * @param writableByteChannelFactory the factory to be used during output
      */
-    public static Bound<String> withWritableByteChannelFactory(
+    public static Bound withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
-      return new Bound<>(DEFAULT_TEXT_CODER)
-          .withWritableByteChannelFactory(writableByteChannelFactory);
+      return new 
Bound().withWritableByteChannelFactory(writableByteChannelFactory);
     }
 
     // TODO: appendingNewlines, etc.
@@ -521,10 +451,8 @@ public class TextIO {
      * A PTransform that writes a bounded PCollection to a text file (or
      * multiple text files matching a sharding pattern), with each
      * PCollection element being encoded into its own line.
-     *
-     * @param <T> the type of the elements of the input PCollection
      */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
+    public static class Bound extends PTransform<PCollection<String>, PDone> {
       private static final String DEFAULT_SHARD_TEMPLATE = 
ShardNameTemplate.INDEX_OF_MAX;
 
       /** The prefix of each file written, combined with suffix and 
shardTemplate. */
@@ -538,9 +466,6 @@ public class TextIO {
       /** An optional footer to add to each file. */
       @Nullable private final String footer;
 
-      /** The Coder to use to decode each line. */
-      private final Coder<T> coder;
-
       /** Requested number of shards. 0 for automatic. */
       private final int numShards;
 
@@ -556,19 +481,18 @@ public class TextIO {
        */
       private final WritableByteChannelFactory writableByteChannelFactory;
 
-      Bound(Coder<T> coder) {
-        this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, 
true,
+      private Bound() {
+        this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true,
             FileBasedSink.CompressionType.UNCOMPRESSED);
       }
 
       private Bound(String name, ValueProvider<String> filenamePrefix, String 
filenameSuffix,
-          @Nullable String header, @Nullable String footer, Coder<T> coder, 
int numShards,
+          @Nullable String header, @Nullable String footer, int numShards,
           String shardTemplate, boolean validate,
           WritableByteChannelFactory writableByteChannelFactory) {
         super(name);
         this.header = header;
         this.footer = footer;
-        this.coder = coder;
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
         this.numShards = numShards;
@@ -586,18 +510,18 @@ public class TextIO {
        *
        * <p>Does not modify this object.
        */
-      public Bound<T> to(String filenamePrefix) {
+      public Bound to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
-        return new Bound<>(name, StaticValueProvider.of(filenamePrefix), 
filenameSuffix,
-            header, footer, coder, numShards, shardTemplate, validate,
+        return new Bound(name, StaticValueProvider.of(filenamePrefix), 
filenameSuffix,
+            header, footer, numShards, shardTemplate, validate,
             writableByteChannelFactory);
       }
 
       /**
        * Like {@link #to(String)}, but with a {@link ValueProvider}.
        */
-      public Bound<T> to(ValueProvider<String> filenamePrefix) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+      public Bound to(ValueProvider<String> filenamePrefix) {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
@@ -609,9 +533,9 @@ public class TextIO {
        *
        * @see ShardNameTemplate
        */
-      public Bound<T> withSuffix(String nameExtension) {
+      public Bound withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
-        return new Bound<>(name, filenamePrefix, nameExtension, header, 
footer, coder, numShards,
+        return new Bound(name, filenamePrefix, nameExtension, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
@@ -629,9 +553,9 @@ public class TextIO {
        *                  decide.
        * @see ShardNameTemplate
        */
-      public Bound<T> withNumShards(int numShards) {
+      public Bound withNumShards(int numShards) {
         checkArgument(numShards >= 0);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
@@ -643,8 +567,8 @@ public class TextIO {
        *
        * @see ShardNameTemplate
        */
-      public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+      public Bound withShardNameTemplate(String shardTemplate) {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
@@ -661,25 +585,12 @@ public class TextIO {
        *
        * <p>Does not modify this object.
        */
-      public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, 1, "",
+      public Bound withoutSharding() {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
1, "",
             validate, writableByteChannelFactory);
       }
 
       /**
-       * Returns a transform for writing to text files that's like this one
-       * but that uses the given {@link Coder Coder&lt;X&gt;} to encode each of
-       * the elements of the input {@link PCollection PCollection&lt;X&gt;} 
into an
-       * output text line. Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input {@link PCollection}
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
-            shardTemplate, validate, writableByteChannelFactory);
-      }
-
-      /**
        * Returns a transform for writing to text files that's like this one but
        * that has GCS output path validation on pipeline creation disabled.
        *
@@ -689,8 +600,8 @@ public class TextIO {
        *
        * <p>Does not modify this object.
        */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+      public Bound withoutValidation() {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, false, writableByteChannelFactory);
       }
 
@@ -704,8 +615,8 @@ public class TextIO {
        *
        * @param header the string to be added as file header
        */
-      public Bound<T> withHeader(@Nullable String header) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+      public Bound withHeader(@Nullable String header) {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
@@ -719,8 +630,8 @@ public class TextIO {
        *
        * @param footer the string to be added as file footer
        */
-      public Bound<T> withFooter(@Nullable String footer) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+      public Bound withFooter(@Nullable String footer) {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
@@ -735,22 +646,22 @@ public class TextIO {
        *
        * @param writableByteChannelFactory the factory to be used during output
        */
-      public Bound<T> withWritableByteChannelFactory(
+      public Bound withWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, header, 
footer, coder, numShards,
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
 
       @Override
-      public PDone expand(PCollection<T> input) {
+      public PDone expand(PCollection<String> input) {
         if (filenamePrefix == null) {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");
         }
-        org.apache.beam.sdk.io.Write.Bound<T> write =
+        org.apache.beam.sdk.io.Write.Bound<String> write =
             org.apache.beam.sdk.io.Write.to(
-                new TextSink<>(filenamePrefix, filenameSuffix, header, footer, 
shardTemplate,
-                    coder, writableByteChannelFactory));
+                new TextSink(filenamePrefix, filenameSuffix, header, footer, 
shardTemplate,
+                    writableByteChannelFactory));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
@@ -812,10 +723,6 @@ public class TextIO {
         return filenameSuffix;
       }
 
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
       @Nullable
       public String getHeader() {
         return header;
@@ -901,40 +808,38 @@ public class TextIO {
    * representing the beginning of the first record to be decoded.
    */
   @VisibleForTesting
-  static class TextSource<T> extends FileBasedSource<T> {
+  static class TextSource extends FileBasedSource<String> {
     /** The Coder to use to decode each line. */
-    private final Coder<T> coder;
-
     @VisibleForTesting
-    TextSource(String fileSpec, Coder<T> coder) {
+    TextSource(String fileSpec) {
       super(fileSpec, 1L);
-      this.coder = coder;
     }
 
     @VisibleForTesting
-    TextSource(ValueProvider<String> fileSpec, Coder<T> coder) {
+    TextSource(ValueProvider<String> fileSpec) {
       super(fileSpec, 1L);
-      this.coder = coder;
     }
 
-    private TextSource(String fileName, long start, long end, Coder<T> coder) {
+    private TextSource(String fileName, long start, long end) {
       super(fileName, 1L, start, end);
-      this.coder = coder;
     }
 
     @Override
-    protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long 
start, long end) {
-      return new TextSource<>(fileName, start, end, coder);
+    protected FileBasedSource<String> createForSubrangeOfFile(
+        String fileName,
+        long start,
+        long end) {
+      return new TextSource(fileName, start, end);
     }
 
     @Override
-    protected FileBasedReader<T> createSingleFileReader(PipelineOptions 
options) {
-      return new TextBasedReader<>(this);
+    protected FileBasedReader<String> createSingleFileReader(PipelineOptions 
options) {
+      return new TextBasedReader(this);
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
+    public Coder<String> getDefaultOutputCoder() {
+      return DEFAULT_TEXT_CODER;
     }
 
     /**
@@ -944,9 +849,8 @@ public class TextIO {
      * <p>See {@link TextSource} for further details.
      */
     @VisibleForTesting
-    static class TextBasedReader<T> extends FileBasedReader<T> {
+    static class TextBasedReader extends FileBasedReader<String> {
       private static final int READ_BUFFER_SIZE = 8192;
-      private final Coder<T> coder;
       private final ByteBuffer readBuffer = 
ByteBuffer.allocate(READ_BUFFER_SIZE);
       private ByteString buffer;
       private int startOfSeparatorInBuffer;
@@ -955,12 +859,11 @@ public class TextIO {
       private volatile long startOfNextRecord;
       private volatile boolean eof;
       private volatile boolean elementIsPresent;
-      private T currentValue;
+      private String currentValue;
       private ReadableByteChannel inChannel;
 
-      private TextBasedReader(TextSource<T> source) {
+      private TextBasedReader(TextSource source) {
         super(source);
-        coder = source.coder;
         buffer = ByteString.EMPTY;
       }
 
@@ -981,7 +884,7 @@ public class TextIO {
       }
 
       @Override
-      public T getCurrent() throws NoSuchElementException {
+      public String getCurrent() throws NoSuchElementException {
         if (!elementIsPresent) {
           throw new NoSuchElementException();
         }
@@ -1078,7 +981,7 @@ public class TextIO {
        */
       private void decodeCurrentElement() throws IOException {
         ByteString dataToDecode = buffer.substring(0, 
startOfSeparatorInBuffer);
-        currentValue = coder.decode(dataToDecode.newInput(), Context.OUTER);
+        currentValue = dataToDecode.toStringUtf8();
         elementIsPresent = true;
         buffer = buffer.substring(endOfSeparatorInBuffer);
       }
@@ -1107,48 +1010,46 @@ public class TextIO {
    * Each record (including the last) is terminated.
    */
   @VisibleForTesting
-  static class TextSink<T> extends FileBasedSink<T> {
-    private final Coder<T> coder;
+  static class TextSink extends FileBasedSink<String> {
     @Nullable private final String header;
     @Nullable private final String footer;
 
     @VisibleForTesting
     TextSink(
-        ValueProvider<String> baseOutputFilename, String extension,
-        @Nullable String header, @Nullable String footer,
-        String fileNameTemplate, Coder<T> coder,
+        ValueProvider<String> baseOutputFilename,
+        String extension,
+        @Nullable String header,
+        @Nullable String footer,
+        String fileNameTemplate,
         WritableByteChannelFactory writableByteChannelFactory) {
       super(baseOutputFilename, extension, fileNameTemplate, 
writableByteChannelFactory);
-      this.coder = coder;
       this.header = header;
       this.footer = footer;
     }
 
     @Override
-    public FileBasedSink.FileBasedWriteOperation<T> 
createWriteOperation(PipelineOptions options) {
-      return new TextWriteOperation<>(this, coder, header, footer);
+    public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation(
+        PipelineOptions options) {
+      return new TextWriteOperation(this, header, footer);
     }
 
     /**
      * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
      * FileBasedWriteOperation} for text files.
      */
-    private static class TextWriteOperation<T> extends 
FileBasedWriteOperation<T> {
-      private final Coder<T> coder;
+    private static class TextWriteOperation extends 
FileBasedWriteOperation<String> {
       @Nullable private final String header;
       @Nullable private final String footer;
 
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder,
-          @Nullable String header, @Nullable String footer) {
+      private TextWriteOperation(TextSink sink, @Nullable String header, 
@Nullable String footer) {
         super(sink);
-        this.coder = coder;
         this.header = header;
         this.footer = footer;
       }
 
       @Override
-      public FileBasedWriter<T> createWriter(PipelineOptions options) throws 
Exception {
-        return new TextWriter<>(this, coder, header, footer);
+      public FileBasedWriter createWriter(PipelineOptions options) throws 
Exception {
+        return new TextWriter(this, header, footer);
       }
     }
 
@@ -1156,35 +1057,42 @@ public class TextIO {
      * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter 
FileBasedWriter}
      * for text files.
      */
-    private static class TextWriter<T> extends FileBasedWriter<T> {
-      private static final byte[] NEWLINE = 
"\n".getBytes(StandardCharsets.UTF_8);
-      private final Coder<T> coder;
+    private static class TextWriter extends FileBasedWriter<String> {
+      private static final String NEWLINE = "\n";
       @Nullable private final String header;
       @Nullable private final String footer;
-      private OutputStream out;
+      private OutputStreamWriter out;
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> 
coder,
-          @Nullable String header, @Nullable String footer) {
+      public TextWriter(
+          FileBasedWriteOperation<String> writeOperation,
+          @Nullable String header,
+          @Nullable String footer) {
         super(writeOperation);
         this.header = header;
         this.footer = footer;
         this.mimeType = MimeTypes.TEXT;
-        this.coder = coder;
       }
 
       /**
-       * Writes {@code value} followed by a newline if {@code value} is not 
null.
+       * Writes {@code value} followed by a newline character if {@code value} 
is not null.
        */
       private void writeIfNotNull(@Nullable String value) throws IOException {
         if (value != null) {
-          out.write(value.getBytes(StandardCharsets.UTF_8));
-          out.write(NEWLINE);
+          writeLine(value);
         }
       }
 
+      /**
+       * Writes {@code value} followed by newline character.
+       */
+      private void writeLine(String value) throws IOException {
+        out.write(value);
+        out.write(NEWLINE);
+      }
+
       @Override
       protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
-        out = Channels.newOutputStream(channel);
+        out = new OutputStreamWriter(Channels.newOutputStream(channel), 
StandardCharsets.UTF_8);
       }
 
       @Override
@@ -1195,12 +1103,13 @@ public class TextIO {
       @Override
       protected void writeFooter() throws Exception {
         writeIfNotNull(footer);
+        // Flush here because there is currently no other natural place to do 
this. [BEAM-1465]
+        out.flush();
       }
 
       @Override
-      public void write(T value) throws Exception {
-        coder.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
+      public void write(String value) throws Exception {
+        writeLine(value);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/adba4c66/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index b8b28eb..6304603 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.TestUtils.INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
-import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2;
@@ -79,8 +77,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TextualIntegerCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
@@ -215,28 +211,21 @@ public class TextIOTest {
     });
   }
 
-  private <T> void runTestRead(T[] expected, Coder<T> coder) throws Exception {
+  private <T> void runTestRead(String[] expected) throws Exception {
     File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile();
     String filename = tmpFile.getPath();
 
     try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) {
-      for (T elem : expected) {
-        byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
+      for (String elem : expected) {
+        byte[] encodedElem = 
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
         String line = new String(encodedElem);
         writer.println(line);
       }
     }
 
-    TextIO.Read.Bound<T> read;
-    if (coder.equals(StringUtf8Coder.of())) {
-      TextIO.Read.Bound<String> readStrings = TextIO.Read.from(filename);
-      // T==String
-      read = (TextIO.Read.Bound<T>) readStrings;
-    } else {
-      read = TextIO.Read.from(filename).withCoder(coder);
-    }
+    TextIO.Read.Bound read = TextIO.Read.from(filename);
 
-    PCollection<T> output = p.apply(read);
+    PCollection<String> output = p.apply(read);
 
     PAssert.that(output).containsInAnyOrder(expected);
     p.run();
@@ -245,31 +234,13 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadStrings() throws Exception {
-    runTestRead(LINES_ARRAY, StringUtf8Coder.of());
+    runTestRead(LINES_ARRAY);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testReadEmptyStrings() throws Exception {
-    runTestRead(NO_LINES_ARRAY, StringUtf8Coder.of());
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadInts() throws Exception {
-    runTestRead(INTS_ARRAY, TextualIntegerCoder.of());
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadEmptyInts() throws Exception {
-    runTestRead(NO_INTS_ARRAY, TextualIntegerCoder.of());
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadNulls() throws Exception {
-    runTestRead(new Void[] {null, null, null}, VoidCoder.of());
+    runTestRead(NO_LINES_ARRAY);
   }
 
   @Test
@@ -286,7 +257,7 @@ public class TextIOTest {
 
   @Test
   public void testReadDisplayData() {
-    TextIO.Read.Bound<?> read = TextIO.Read
+    TextIO.Read.Bound read = TextIO.Read
         .from("foo.*")
         .withCompressionType(BZIP2)
         .withoutValidation();
@@ -303,7 +274,7 @@ public class TextIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    TextIO.Read.Bound<String> read = TextIO.Read
+    TextIO.Read.Bound read = TextIO.Read
         .from("foobar")
         .withoutValidation();
 
@@ -312,36 +283,32 @@ public class TextIOTest {
         displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
   }
 
-  private <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
-    runTestWrite(elems, null, null, coder, 1);
+  private void runTestWrite(String[] elems) throws Exception {
+    runTestWrite(elems, null, null, 1);
   }
 
-  private <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) 
throws Exception {
-    runTestWrite(elems, null, null, coder, numShards);
+  private void runTestWrite(String[] elems, int numShards) throws Exception {
+    runTestWrite(elems, null, null, numShards);
   }
 
-  private <T> void runTestWrite(T[] elems, Coder<T> coder, String header, 
String footer)
+  private void runTestWrite(String[] elems, String header, String footer)
       throws Exception {
-    runTestWrite(elems, header, footer, coder, 1);
+    runTestWrite(elems, header, footer, 1);
   }
 
-  private <T> void runTestWrite(
-      T[] elems, String header, String footer, Coder<T> coder, int numShards) 
throws Exception {
+  private void runTestWrite(
+      String[] elems, String header, String footer, int numShards) throws 
Exception {
     String outputName = "file.txt";
     Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
     String baseFilename = baseDir.resolve(outputName).toString();
 
-    PCollection<T> input = 
p.apply(Create.of(Arrays.asList(elems)).withCoder(coder));
+    PCollection<String> input =
+        
p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
 
-    TextIO.Write.Bound<T> write;
-    if (coder.equals(StringUtf8Coder.of())) {
-      TextIO.Write.Bound<String> writeStrings = TextIO.Write.to(baseFilename);
-      // T==String
-      write = (TextIO.Write.Bound<T>) writeStrings;
-    } else {
-      write = TextIO.Write.to(baseFilename).withCoder(coder);
-    }
-    write = write.withHeader(header).withFooter(footer);
+    TextIO.Write.Bound write =
+        TextIO.Write.to(baseFilename)
+            .withHeader(header)
+            .withFooter(footer);
 
     if (numShards == 1) {
       write = write.withoutSharding();
@@ -353,15 +320,14 @@ public class TextIOTest {
 
     p.run();
 
-    assertOutputFiles(elems, header, footer, coder, numShards, baseDir, 
outputName,
+    assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
         write.getShardNameTemplate());
   }
 
-  public static <T> void assertOutputFiles(
-      T[] elems,
+  public static void assertOutputFiles(
+      String[] elems,
       final String header,
       final String footer,
-      Coder<T> coder,
       int numShards,
       Path rootLocation,
       String outputName,
@@ -400,8 +366,8 @@ public class TextIOTest {
     }
 
     List<String> expectedElements = new ArrayList<>(elems.length);
-    for (T elem : elems) {
-      byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
+    for (String elem : elems) {
+      byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), 
elem);
       String line = new String(encodedElem);
       expectedElements.add(line);
     }
@@ -453,55 +419,43 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testWriteStrings() throws Exception {
-    runTestWrite(LINES_ARRAY, StringUtf8Coder.of());
+    runTestWrite(LINES_ARRAY);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteEmptyStringsNoSharding() throws Exception {
-    runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of(), 0);
+    runTestWrite(NO_LINES_ARRAY, 0);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteEmptyStrings() throws Exception {
-    runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of());
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteInts() throws Exception {
-    runTestWrite(INTS_ARRAY, TextualIntegerCoder.of());
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteEmptyInts() throws Exception {
-    runTestWrite(NO_INTS_ARRAY, TextualIntegerCoder.of());
+    runTestWrite(NO_LINES_ARRAY);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testShardedWrite() throws Exception {
-    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
+    runTestWrite(LINES_ARRAY, 5);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithHeader() throws Exception {
-    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, null);
+    runTestWrite(LINES_ARRAY, MY_HEADER, null);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithFooter() throws Exception {
-    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), null, MY_FOOTER);
+    runTestWrite(LINES_ARRAY, null, MY_FOOTER);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithHeaderAndFooter() throws Exception {
-    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, MY_FOOTER);
+    runTestWrite(LINES_ARRAY, MY_HEADER, MY_FOOTER);
   }
 
   @Test
@@ -515,7 +469,7 @@ public class TextIOTest {
 
     final WritableByteChannelFactory writableByteChannelFactory =
         new DrunkWritableByteChannelFactory();
-    TextIO.Write.Bound<String> write = 
TextIO.Write.to(baseDir.resolve(outputName).toString())
+    TextIO.Write.Bound write = 
TextIO.Write.to(baseDir.resolve(outputName).toString())
         
.withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("writableByteChannelFactory", 
"DRUNK"));
@@ -526,16 +480,16 @@ public class TextIOTest {
 
     final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 
2);
     for (String elem : LINES2_ARRAY) {
-      drunkElems.add(elem + elem);
-      drunkElems.add("");
+      drunkElems.add(elem);
+      drunkElems.add(elem);
     }
-    assertOutputFiles(drunkElems.toArray(new String[0]), null, null, coder, 1, 
baseDir,
+    assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, 
baseDir,
         outputName + writableByteChannelFactory.getFilenameSuffix(), 
write.getShardNameTemplate());
   }
 
   @Test
   public void testWriteDisplayData() {
-    TextIO.Write.Bound<?> write = TextIO.Write
+    TextIO.Write.Bound write = TextIO.Write
         .to("foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
@@ -558,7 +512,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayDataValidateThenHeader() {
-    TextIO.Write.Bound<?> write = TextIO.Write
+    TextIO.Write.Bound write = TextIO.Write
         .to("foo")
         .withHeader("myHeader");
 
@@ -570,7 +524,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayDataValidateThenFooter() {
-    TextIO.Write.Bound<?> write = TextIO.Write
+    TextIO.Write.Bound write = TextIO.Write
         .to("foo")
         .withFooter("myFooter");
 
@@ -590,7 +544,7 @@ public class TextIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    TextIO.Write.Bound<?> write = TextIO.Write.to(outputPath);
+    TextIO.Write.Bound write = TextIO.Write.to(outputPath);
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("TextIO.Write should include the file prefix in its primitive 
display data",
@@ -649,21 +603,21 @@ public class TextIOTest {
 
   @Test
   public void testReadWithoutValidationFlag() throws Exception {
-    TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/foo*/baz");
+    TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/foo*/baz");
     assertTrue(read.needsValidation());
     assertFalse(read.withoutValidation().needsValidation());
   }
 
   @Test
   public void testWriteWithoutValidationFlag() throws Exception {
-    TextIO.Write.Bound<String> write = TextIO.Write.to("gs://bucket/foo/baz");
+    TextIO.Write.Bound write = TextIO.Write.to("gs://bucket/foo/baz");
     assertTrue(write.needsValidation());
     assertFalse(write.withoutValidation().needsValidation());
   }
 
   @Test
   public void testCompressionTypeIsSet() throws Exception {
-    TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/test");
+    TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/test");
     assertEquals(AUTO, read.getCompressionType());
     read = TextIO.Read.from("gs://bucket/test").withCompressionType(GZIP);
     assertEquals(GZIP, read.getCompressionType());
@@ -688,7 +642,7 @@ public class TextIOTest {
   private void assertReadingCompressedFileMatchesExpected(
       File file, CompressionType compressionType, String[] expected) {
 
-    TextIO.Read.Bound<String> read =
+    TextIO.Read.Bound read =
         TextIO.Read.from(file.getPath()).withCompressionType(compressionType);
     PCollection<String> output = p.apply("Read_" + file + "_" + 
compressionType.toString(), read);
 
@@ -1064,74 +1018,74 @@ public class TextIOTest {
   }
 
   private void runTestReadWithData(byte[] data, List<String> expectedResults) 
throws Exception {
-    TextSource<String> source = prepareSource(data);
+    TextSource source = prepareSource(data);
     List<String> actual = SourceTestUtils.readFromSource(source, 
PipelineOptionsFactory.create());
     assertThat(actual, containsInAnyOrder(new 
ArrayList<>(expectedResults).toArray(new String[0])));
   }
 
   @Test
   public void testSplittingSourceWithEmptyLines() throws Exception {
-    TextSource<String> source = 
prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithLineFeedDelimiter() throws Exception {
-    TextSource<String> source = 
prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithCarriageReturnDelimiter() throws 
Exception {
-    TextSource<String> source = 
prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() 
throws Exception {
-    TextSource<String> source = prepareSource(
+    TextSource source = prepareSource(
         "asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithMixedDelimiters() throws Exception {
-    TextSource<String> source = prepareSource(
+    TextSource source = prepareSource(
         "asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() 
throws Exception {
-    TextSource<String> source = 
prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void 
testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd()
       throws Exception {
-    TextSource<String> source = 
prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void 
testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd()
       throws Exception {
-    TextSource<String> source = prepareSource(
+    TextSource source = prepareSource(
         "asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
   @Test
   public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() 
throws Exception {
-    TextSource<String> source = 
prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8));
+    TextSource source = 
prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8));
     SourceTestUtils.assertSplitAtFractionExhaustive(source, 
PipelineOptionsFactory.create());
   }
 
-  private TextSource<String> prepareSource(byte[] data) throws IOException {
+  private TextSource prepareSource(byte[] data) throws IOException {
     Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
     Files.write(path, data);
-    return new TextSource<>(path.toString(), StringUtf8Coder.of());
+    return new TextSource(path.toString());
   }
 
   @Test

Reply via email to