http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
new file mode 100644
index 0000000..4f45cc9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
+import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
+import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that writes to a {@link FileBasedSink}. A write begins 
with a sequential
+ * global initialization of a sink, followed by a parallel write, and ends 
with a sequential
+ * finalization of the write. The output of a write is {@link PDone}.
+ *
+ * <p>By default, every bundle in the input {@link PCollection} will be 
processed by a
+ * {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation}, so 
the number of output
+ * will vary based on runner behavior, though at least 1 output will always be 
produced. The
+ * exact parallelism of the write stage can be controlled using {@link 
WriteFiles#withNumShards},
+ * typically used to control how many files are produced or to globally limit 
the number of
+ * workers connecting to an external service. However, this option can often 
hurt performance: it
+ * adds an additional {@link GroupByKey} to the pipeline.
+ *
+ * <p>Example usage with runner-determined sharding:
+ *
+ * <pre>{@code p.apply(WriteFiles.to(new MySink(...)));}</pre>
+ *
+ * <p>Example usage with a fixed number of shards:
+ *
+ * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
+  private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
+
+  private static final int UNKNOWN_SHARDNUM = -1;
+  private static final int UNKNOWN_NUMSHARDS = -1;
+
+  private FileBasedSink<T> sink;
+  private FileBasedWriteOperation<T> writeOperation;
+  // This allows the number of shards to be dynamically computed based on the 
input
+  // PCollection.
+  @Nullable
+  private final PTransform<PCollection<T>, PCollectionView<Integer>> 
computeNumShards;
+  // We don't use a side input for static sharding, as we want this value to 
be updatable
+  // when a pipeline is updated.
+  @Nullable
+  private final ValueProvider<Integer> numShardsProvider;
+  private boolean windowedWrites;
+
+  /**
+   * Creates a {@link WriteFiles} transform that writes to the given {@link 
FileBasedSink}, letting
+   * the runner control how many different shards are produced.
+   */
+  public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
+    checkNotNull(sink, "sink");
+    return new WriteFiles<>(sink, null /* runner-determined sharding */, null, 
false);
+  }
+
+  private WriteFiles(
+      FileBasedSink<T> sink,
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> 
computeNumShards,
+      @Nullable ValueProvider<Integer> numShardsProvider,
+      boolean windowedWrites) {
+    this.sink = sink;
+    this.computeNumShards = computeNumShards;
+    this.numShardsProvider = numShardsProvider;
+    this.windowedWrites = windowedWrites;
+  }
+
+  @Override
+  public PDone expand(PCollection<T> input) {
+    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
+        "%s can only be applied to an unbounded PCollection if doing windowed 
writes",
+        WriteFiles.class.getSimpleName());
+    PipelineOptions options = input.getPipeline().getOptions();
+    sink.validate(options);
+    this.writeOperation = sink.createWriteOperation(options);
+    this.writeOperation.setWindowedWrites(windowedWrites);
+    return createWrite(input);
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles 
Sink"))
+        .include("sink", sink);
+    if (getSharding() != null) {
+      builder.include("sharding", getSharding());
+    } else if (getNumShards() != null) {
+      String numShards = getNumShards().isAccessible()
+          ? getNumShards().get().toString() : getNumShards().toString();
+      builder.add(DisplayData.item("numShards", numShards)
+          .withLabel("Fixed Number of Shards"));
+    }
+  }
+
+  /**
+   * Returns the {@link FileBasedSink} associated with this PTransform.
+   */
+  public FileBasedSink<T> getSink() {
+    return sink;
+  }
+
+  /**
+   * Gets the {@link PTransform} that will be used to determine sharding. This 
can be either a
+   * static number of shards (as following a call to {@link 
#withNumShards(int)}), dynamic (by
+   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
+   * #withRunnerDeterminedSharding()}.
+   */
+  @Nullable
+  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+    return computeNumShards;
+  }
+
+  public ValueProvider<Integer> getNumShards() {
+    return numShardsProvider;
+  }
+
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link 
FileBasedSink} using the
+   * specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See 
{@link WriteFiles} for
+   * more information.
+   *
+   * <p>A value less than or equal to 0 will be equivalent to the default 
behavior of
+   * runner-determined sharding.
+   */
+  public WriteFiles<T> withNumShards(int numShards) {
+    if (numShards > 0) {
+      return withNumShards(StaticValueProvider.of(numShards));
+    }
+    return withRunnerDeterminedSharding();
+  }
+
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link 
FileBasedSink} using the
+   * {@link ValueProvider} specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See 
{@link WriteFiles} for
+   * more information.
+   */
+  public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) 
{
+    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link 
FileBasedSink} using the
+   * specified {@link PTransform} to compute the number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See 
{@link WriteFiles} for
+   * more information.
+   */
+  public WriteFiles<T> withSharding(PTransform<PCollection<T>, 
PCollectionView<Integer>> sharding) {
+    checkNotNull(
+        sharding, "Cannot provide null sharding. Use 
withRunnerDeterminedSharding() instead");
+    return new WriteFiles<>(sink, sharding, null, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link 
FileBasedSink} with
+   * runner-determined sharding.
+   */
+  public WriteFiles<T> withRunnerDeterminedSharding() {
+    return new WriteFiles<>(sink, null, null, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link WriteFiles} that writes preserves windowing on it's 
input.
+   *
+   * <p>If this option is not specified, windowing and triggering are replaced 
by
+   * {@link GlobalWindows} and {@link DefaultTrigger}.
+   *
+   * <p>If there is no data for a window, no output shards will be generated 
for that window.
+   * If a window triggers multiple times, then more than a single output shard 
might be
+   * generated multiple times; it's up to the sink implementation to keep 
these output shards
+   * unique.
+   *
+   * <p>This option can only be used if {@link #withNumShards(int)} is also 
set to a
+   * positive value.
+   */
+  public WriteFiles<T> withWindowedWrites() {
+    return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true);
+  }
+
+  /**
+   * Writes all the elements in a bundle using a {@link FileBasedWriter} 
produced by the
+   * {@link FileBasedSink.FileBasedWriteOperation} associated with the {@link 
FileBasedSink}.
+   */
+  private class WriteBundles extends DoFn<T, FileResult> {
+    // Writer that will write the records in this bundle. Lazily
+    // initialized in processElement.
+    private FileBasedWriter<T> writer = null;
+
+    WriteBundles() {
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
+      // Lazily initialize the Writer
+      if (writer == null) {
+        LOG.info("Opening writer for write operation {}", writeOperation);
+        writer = writeOperation.createWriter(c.getPipelineOptions());
+
+        if (windowedWrites) {
+          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), 
UNKNOWN_SHARDNUM,
+              UNKNOWN_NUMSHARDS);
+        } else {
+          writer.openUnwindowed(UUID.randomUUID().toString(), 
UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+        }
+        LOG.debug("Done opening writer {} for operation {}", writer, 
writeOperation);
+      }
+      try {
+        writer.write(c.element());
+      } catch (Exception e) {
+        // Discard write result and close the write.
+        try {
+          writer.close();
+          // The writer does not need to be reset, as this DoFn cannot be 
reused.
+        } catch (Exception closeException) {
+          if (closeException instanceof InterruptedException) {
+            // Do not silently ignore interrupted state.
+            Thread.currentThread().interrupt();
+          }
+          // Do not mask the exception that caused the write to fail.
+          e.addSuppressed(closeException);
+        }
+        throw e;
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (writer != null) {
+        FileResult result = writer.close();
+        c.output(result);
+        // Reset state in case of reuse.
+        writer = null;
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(WriteFiles.this);
+    }
+  }
+
+  /**
+   * Like {@link WriteBundles}, but where the elements for each shard have 
been collected into
+   * a single iterable.
+   *
+   * @see WriteBundles
+   */
+  private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, 
FileResult> {
+    private final PCollectionView<Integer> numShardsView;
+
+    WriteShardedBundles(PCollectionView<Integer> numShardsView) {
+      this.numShardsView = numShardsView;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
+      int numShards = numShardsView != null ? c.sideInput(numShardsView) : 
getNumShards().get();
+      // In a sharded write, single input element represents one shard. We can 
open and close
+      // the writer in each call to processElement.
+      LOG.info("Opening writer for write operation {}", writeOperation);
+      FileBasedWriter<T> writer = 
writeOperation.createWriter(c.getPipelineOptions());
+      if (windowedWrites) {
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), 
c.element().getKey(),
+            numShards);
+      } else {
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, 
UNKNOWN_NUMSHARDS);
+      }
+      LOG.debug("Done opening writer {} for operation {}", writer, 
writeOperation);
+
+      try {
+        try {
+          for (T t : c.element().getValue()) {
+            writer.write(t);
+          }
+        } catch (Exception e) {
+          try {
+            writer.close();
+          } catch (Exception closeException) {
+            if (closeException instanceof InterruptedException) {
+              // Do not silently ignore interrupted state.
+              Thread.currentThread().interrupt();
+            }
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
+          }
+          throw e;
+        }
+
+        // Close the writer; if this throws let the error propagate.
+        FileResult result = writer.close();
+        c.output(result);
+      } catch (Exception e) {
+        // If anything goes wrong, make sure to delete the temporary file.
+        writer.cleanup();
+        throw e;
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(WriteFiles.this);
+    }
+  }
+
+  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+    private final PCollectionView<Integer> numShardsView;
+    private final ValueProvider<Integer> numShardsProvider;
+    private int shardNumber;
+
+    ApplyShardingKey(PCollectionView<Integer> numShardsView,
+                     ValueProvider<Integer> numShardsProvider) {
+      this.numShardsView = numShardsView;
+      this.numShardsProvider = numShardsProvider;
+      shardNumber = UNKNOWN_SHARDNUM;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      int shardCount = 0;
+      if (numShardsView != null) {
+        shardCount = context.sideInput(numShardsView);
+      } else {
+        checkNotNull(numShardsProvider);
+        shardCount = numShardsProvider.get();
+      }
+      checkArgument(
+          shardCount > 0,
+          "Must have a positive number of shards specified for 
non-runner-determined sharding."
+              + " Got %s",
+          shardCount);
+      if (shardNumber == UNKNOWN_SHARDNUM) {
+        // We want to desynchronize the first record sharding key for each 
instance of
+        // ApplyShardingKey, so records in a small PCollection will be 
statistically balanced.
+        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
+      } else {
+        shardNumber = (shardNumber + 1) % shardCount;
+      }
+      context.output(KV.of(shardNumber, context.element()));
+    }
+  }
+
+  /**
+   * A write is performed as sequence of three {@link ParDo}'s.
+   *
+   * <p>This singleton collection containing the FileBasedWriteOperation is 
then used as a side
+   * input to a ParDo over the PCollection of elements to write. In this 
bundle-writing phase,
+   * {@link FileBasedWriteOperation#createWriter} is called to obtain a {@link 
FileBasedWriter}.
+   * {@link FileBasedWriter#open} and {@link FileBasedWriter#close} are called 
in
+   * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and
+   * {@link FileBasedWriter#write} method is called for every element in the 
bundle. The output
+   * of this ParDo is a PCollection of <i>writer result</i> objects (see 
{@link FileBasedSink}
+   * for a description of writer results)-one for each bundle.
+   *
+   * <p>The final do-once ParDo uses a singleton collection asinput and the 
collection of writer
+   * results as a side-input. In this ParDo, {@link 
FileBasedWriteOperation#finalize} is called
+   * to finalize the write.
+   *
+   * <p>If the write of any element in the PCollection fails, {@link 
FileBasedWriter#close} will be
+   * called before the exception that caused the write to fail is propagated 
and the write result
+   * will be discarded.
+   *
+   * <p>Since the {@link FileBasedWriteOperation} is serialized after the 
initialization ParDo and
+   * deserialized in the bundle-writing and finalization phases, any state 
change to the
+   * FileBasedWriteOperation object that occurs during initialization is 
visible in the latter
+   * phases. However, the FileBasedWriteOperation is not serialized after the 
bundle-writing
+   * phase. This is why implementations should guarantee that
+   * {@link FileBasedWriteOperation#createWriter} does not mutate 
FileBasedWriteOperation).
+   */
+  private PDone createWrite(PCollection<T> input) {
+    Pipeline p = input.getPipeline();
+
+    if (!windowedWrites) {
+      // Re-window the data into the global window and remove any existing 
triggers.
+      input =
+          input.apply(
+              Window.<T>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+    }
+
+
+    // Perform the per-bundle writes as a ParDo on the input PCollection (with 
the
+    // FileBasedWriteOperation as a side input) and collect the results of the 
writes in a
+    // PCollection. There is a dependency between this ParDo and the first (the
+    // FileBasedWriteOperation PCollection as a side input), so this will 
happen after the
+    // initial ParDo.
+    PCollection<FileResult> results;
+    final PCollectionView<Integer> numShardsView;
+    if (computeNumShards == null && numShardsProvider == null) {
+      if (windowedWrites) {
+        throw new IllegalStateException("When doing windowed writes, numShards 
must be set"
+            + "explicitly to a positive value");
+      }
+      numShardsView = null;
+      results = input
+          .apply("WriteBundles",
+              ParDo.of(new WriteBundles()));
+    } else {
+      if (computeNumShards != null) {
+        numShardsView = input.apply(computeNumShards);
+        results  = input
+            .apply("ApplyShardLabel", ParDo.of(
+                new ApplyShardingKey<T>(numShardsView, 
null)).withSideInputs(numShardsView))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles(numShardsView))
+                    .withSideInputs(numShardsView));
+      } else {
+        numShardsView = null;
+        results = input
+            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, 
numShardsProvider)))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles(null)));
+      }
+    }
+    results.setCoder(writeOperation.getFileResultCoder());
+
+    if (windowedWrites) {
+      // When processing streaming windowed writes, results will arrive 
multiple times. This
+      // means we can't share the below implementation that turns the results 
into a side input,
+      // as new data arriving into a side input does not trigger the listening 
DoFn. Instead
+      // we aggregate the result set using a singleton GroupByKey, so the DoFn 
will be triggered
+      // whenever new data arrives.
+      PCollection<KV<Void, FileResult>> keyedResults =
+          results.apply("AttachSingletonKey", WithKeys.<Void, 
FileResult>of((Void) null));
+      keyedResults.setCoder(KvCoder.of(VoidCoder.of(), 
writeOperation.getFileResultCoder()));
+
+      // Is the continuation trigger sufficient?
+      keyedResults
+          .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create())
+          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, 
Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<FileResult> results = 
Lists.newArrayList(c.element().getValue());
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
+            }
+          }));
+    } else {
+      final PCollectionView<Iterable<FileResult>> resultsView =
+          results.apply(View.<FileResult>asIterable());
+      ImmutableList.Builder<PCollectionView<?>> sideInputs =
+          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+      if (numShardsView != null) {
+        sideInputs.add(numShardsView);
+      }
+
+      // Finalize the write in another do-once ParDo on the singleton 
collection containing the
+      // Writer. The results from the per-bundle writes are given as an 
Iterable side input.
+      // The FileBasedWriteOperation's state is the same as after its 
initialization in the first
+      // do-once ParDo. There is a dependency between this ParDo and the 
parallel write (the writer
+      // results collection as a side input), so it will happen after the 
parallel write.
+      // For the non-windowed case, we guarantee that  if no data is written 
but the user has
+      // set numShards, then all shards will be written out as empty files. 
For this reason we
+      // use a side input here.
+      PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
+      singletonCollection
+          .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<FileResult> results = 
Lists.newArrayList(c.sideInput(resultsView));
+              LOG.debug("Side input initialized to finalize write operation 
{}.", writeOperation);
+
+              // We must always output at least 1 shard, and honor 
user-specified numShards if
+              // set.
+              int minShardsNeeded;
+              if (numShardsView != null) {
+                minShardsNeeded = c.sideInput(numShardsView);
+              } else if (numShardsProvider != null) {
+                minShardsNeeded = numShardsProvider.get();
+              } else {
+                minShardsNeeded = 1;
+              }
+              int extraShardsNeeded = minShardsNeeded - results.size();
+              if (extraShardsNeeded > 0) {
+                LOG.info(
+                    "Creating {} empty output shards in addition to {} written 
for a total of "
+                        + " {}.", extraShardsNeeded, results.size(), 
minShardsNeeded);
+                for (int i = 0; i < extraShardsNeeded; ++i) {
+                  FileBasedWriter<T> writer = 
writeOperation.createWriter(c.getPipelineOptions());
+                  writer.openUnwindowed(UUID.randomUUID().toString(), 
UNKNOWN_SHARDNUM,
+                      UNKNOWN_NUMSHARDS);
+                  FileResult emptyWrite = writer.close();
+                  results.add(emptyWrite);
+                }
+                LOG.debug("Done creating extra shards.");
+              }
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
+            }
+          }).withSideInputs(sideInputs.build()));
+    }
+    return PDone.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
index a53fb86..6ced5d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.values.PDone;
 public class XmlIO {
   // CHECKSTYLE.OFF: JavadocStyle
   /**
-   * Reads XML files. This source reads one or more XML files and
-   * creates a {@link PCollection} of a given type. Please note the example 
given below.
+   * Reads XML files. This source reads one or more XML files and creates a 
{@link PCollection} of a
+   * given type. Please note the example given below.
    *
    * <p>The XML file must be of the following form, where {@code root} and 
{@code record} are XML
    * element names that are defined by the user:
@@ -88,7 +88,8 @@ public class XmlIO {
    * Apache Beam.
    *
    * <h3>Permissions</h3>
-   * Permission requirements depend on the {@link 
org.apache.beam.sdk.runners.PipelineRunner
+   *
+   * <p>Permission requirements depend on the {@link 
org.apache.beam.sdk.runners.PipelineRunner
    * PipelineRunner} that is used to execute the Beam pipeline. Please refer 
to the documentation of
    * corresponding {@link PipelineRunner PipelineRunners} for more details.
    *
@@ -105,8 +106,8 @@ public class XmlIO {
 
   // CHECKSTYLE.OFF: JavadocStyle
   /**
-   * A {@link Sink} that outputs records as XML-formatted elements. Writes a 
{@link PCollection} of
-   * records from JAXB-annotated classes to a single file location.
+   * A {@link FileBasedSink} that outputs records as XML-formatted elements. 
Writes a {@link
+   * PCollection} of records from JAXB-annotated classes to a single file 
location.
    *
    * <p>Given a PCollection containing records of type T that can be 
marshalled to XML elements,
    * this Sink will produce a single file consisting of a single root element 
that contains all of
@@ -268,6 +269,7 @@ public class XmlIO {
 
       /**
        * Determine if a given filename matches a compression type based on its 
extension.
+       *
        * @param filename the filename to match
        * @return true iff the filename ends with the compression type's known 
extension.
        */
@@ -277,8 +279,8 @@ public class XmlIO {
     }
 
     /**
-     * Reads a single XML file or a set of XML files defined by a Java "glob"
-     * file pattern. Each XML file should be of the form defined in {@link 
#read}.
+     * Reads a single XML file or a set of XML files defined by a Java "glob" 
file pattern. Each XML
+     * file should be of the form defined in {@link #read}.
      */
     public Read<T> from(String fileOrPatternSpec) {
       return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();
@@ -322,9 +324,9 @@ public class XmlIO {
     /**
      * Decompresses all input files using the specified compression type.
      *
-     * <p>If no compression type is specified, the default is {@link 
CompressionType#AUTO}.
-     * In this mode, the compression type of the file is determined by its 
extension.
-     * Supports .gz, .bz2, .zip and .deflate compression.
+     * <p>If no compression type is specified, the default is {@link 
CompressionType#AUTO}. In this
+     * mode, the compression type of the file is determined by its extension. 
Supports .gz, .bz2,
+     * .zip and .deflate compression.
      */
     public Read<T> withCompressionType(CompressionType compressionType) {
       return toBuilder().setCompressionType(compressionType).build();
@@ -415,7 +417,6 @@ public class XmlIO {
       abstract Write<T> build();
     }
 
-
     /**
      * Writes to files with the given path prefix.
      *
@@ -435,9 +436,7 @@ public class XmlIO {
       return toBuilder().setRecordClass(recordClass).build();
     }
 
-    /**
-     * Sets the enclosing root element for the generated XML files.
-     */
+    /** Sets the enclosing root element for the generated XML files. */
     public Write<T> withRootElement(String rootElement) {
       return toBuilder().setRootElement(rootElement).build();
     }
@@ -456,7 +455,7 @@ public class XmlIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      return input.apply(org.apache.beam.sdk.io.Write.to(createSink()));
+      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
     }
 
     @VisibleForTesting
@@ -468,10 +467,10 @@ public class XmlIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       createSink().populateFileBasedDisplayData(builder);
       builder
-          .addIfNotNull(DisplayData.item("rootElement", getRootElement())
-              .withLabel("XML Root Element"))
-          .addIfNotNull(DisplayData.item("recordClass", getRecordClass())
-              .withLabel("XML Record Class"));
+          .addIfNotNull(
+              DisplayData.item("rootElement", getRootElement()).withLabel("XML 
Root Element"))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 7700329..b890908 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -65,7 +65,7 @@ class XmlSink<T> extends FileBasedSink<T> {
   }
 
   /**
-   * {@link Sink.WriteOperation} for XML {@link Sink}s.
+   * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link 
FileBasedSink}s.
    */
   protected static final class XmlWriteOperation<T> extends 
FileBasedWriteOperation<T> {
     public XmlWriteOperation(XmlSink<T> sink) {
@@ -97,7 +97,7 @@ class XmlSink<T> extends FileBasedSink<T> {
   }
 
   /**
-   * A {@link Sink.Writer} that can write objects as XML elements.
+   * A {@link FileBasedWriter} that can write objects as XML elements.
    */
   protected static final class XmlWriter<T> extends FileBasedWriter<T> {
     final Marshaller marshaller;

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
index eb5db20..1e5c4dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
@@ -20,11 +20,12 @@ package org.apache.beam.sdk.values;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
  * {@link PDone} is the output of a {@link PTransform} that has a trivial 
result,
- * such as a {@link org.apache.beam.sdk.io.Write}.
+ * such as a {@link WriteFiles}.
  */
 public class PDone extends POutputValueBase {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 5b81ba8..65fb8ba 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -533,79 +533,6 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * A simple FileBasedSink that writes String values as lines with header and 
footer lines.
-   */
-  private static final class SimpleSink extends FileBasedSink<String> {
-    public SimpleSink(String baseOutputFilename, String extension) {
-      super(baseOutputFilename, extension);
-    }
-
-    public SimpleSink(String baseOutputFilename, String extension,
-        WritableByteChannelFactory writableByteChannelFactory) {
-      super(baseOutputFilename, extension, writableByteChannelFactory);
-    }
-
-    public SimpleSink(String baseOutputFilename, String extension, String 
fileNamingTemplate) {
-      super(baseOutputFilename, extension, fileNamingTemplate);
-    }
-
-    @Override
-    public SimpleWriteOperation createWriteOperation(PipelineOptions options) {
-      return new SimpleWriteOperation(this);
-    }
-
-    private static final class SimpleWriteOperation extends 
FileBasedWriteOperation<String> {
-      public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
-        super(sink, tempOutputFilename);
-      }
-
-      public SimpleWriteOperation(SimpleSink sink) {
-        super(sink);
-      }
-
-      @Override
-      public SimpleWriter createWriter(PipelineOptions options) throws 
Exception {
-        return new SimpleWriter(this);
-      }
-    }
-
-    private static final class SimpleWriter extends FileBasedWriter<String> {
-      static final String HEADER = "header";
-      static final String FOOTER = "footer";
-
-      private WritableByteChannel channel;
-
-      public SimpleWriter(SimpleWriteOperation writeOperation) {
-        super(writeOperation);
-      }
-
-      private static ByteBuffer wrap(String value) throws Exception {
-        return ByteBuffer.wrap((value + "\n").getBytes("UTF-8"));
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
-        this.channel = channel;
-      }
-
-      @Override
-      protected void writeHeader() throws Exception {
-        channel.write(wrap(HEADER));
-      }
-
-      @Override
-      protected void writeFooter() throws Exception {
-        channel.write(wrap(FOOTER));
-      }
-
-      @Override
-      public void write(String value) throws Exception {
-        channel.write(wrap(value));
-      }
-    }
-  }
-
-  /**
    * Build a SimpleSink with default options.
    */
   private SimpleSink buildSink() {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
new file mode 100644
index 0000000..e3cd9b6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A simple FileBasedSink that writes String values as lines with header and 
footer lines.
+ */
+class SimpleSink extends FileBasedSink<String> {
+  public SimpleSink(String baseOutputFilename, String extension) {
+    super(baseOutputFilename, extension);
+  }
+
+  public SimpleSink(String baseOutputFilename, String extension,
+                    WritableByteChannelFactory writableByteChannelFactory) {
+    super(baseOutputFilename, extension, writableByteChannelFactory);
+  }
+
+  public SimpleSink(String baseOutputFilename, String extension, String 
fileNamingTemplate) {
+    super(baseOutputFilename, extension, fileNamingTemplate);
+  }
+
+  @Override
+  public SimpleWriteOperation createWriteOperation(PipelineOptions options) {
+    return new SimpleWriteOperation(this);
+  }
+
+  static final class SimpleWriteOperation extends 
FileBasedWriteOperation<String> {
+    public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
+      super(sink, tempOutputFilename);
+    }
+
+    public SimpleWriteOperation(SimpleSink sink) {
+      super(sink);
+    }
+
+    @Override
+    public SimpleWriter createWriter(PipelineOptions options) throws Exception 
{
+      return new SimpleWriter(this);
+    }
+  }
+
+  static final class SimpleWriter extends FileBasedWriter<String> {
+    static final String HEADER = "header";
+    static final String FOOTER = "footer";
+
+    private WritableByteChannel channel;
+
+    public SimpleWriter(SimpleWriteOperation writeOperation) {
+      super(writeOperation);
+    }
+
+    private static ByteBuffer wrap(String value) throws Exception {
+      return ByteBuffer.wrap((value + "\n").getBytes("UTF-8"));
+    }
+
+    @Override
+    protected void prepareWrite(WritableByteChannel channel) throws Exception {
+      this.channel = channel;
+    }
+
+    @Override
+    protected void writeHeader() throws Exception {
+      channel.write(wrap(HEADER));
+    }
+
+    @Override
+    protected void writeFooter() throws Exception {
+      channel.write(wrap(FOOTER));
+    }
+
+    @Override
+    public void write(String value) throws Exception {
+      channel.write(wrap(value));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
new file mode 100644
index 0000000..ea0395d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
+import org.apache.beam.sdk.options.Description;
+import 
org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for the WriteFiles PTransform.
+ */
+@RunWith(JUnit4.class)
+public class WriteFilesTest {
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @SuppressWarnings("unchecked") // covariant cast
+  private static final PTransform<PCollection<String>, PCollection<String>> 
IDENTITY_MAP =
+      (PTransform)
+          MapElements.via(
+              new SimpleFunction<String, String>() {
+                @Override
+                public String apply(String input) {
+                  return input;
+                }
+              });
+
+  private static final PTransform<PCollection<String>, 
PCollectionView<Integer>>
+      SHARDING_TRANSFORM =
+      new PTransform<PCollection<String>, PCollectionView<Integer>>() {
+        @Override
+        public PCollectionView<Integer> expand(PCollection<String> input) {
+          return null;
+        }
+      };
+
+  private static class WindowAndReshuffle<T> extends 
PTransform<PCollection<T>, PCollection<T>> {
+    private final Window<T> window;
+
+    public WindowAndReshuffle(Window<T> window) {
+      this.window = window;
+    }
+
+    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
+      }
+    }
+
+    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, 
Iterable<T>>, T> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        for (T s : c.element().getValue()) {
+          c.output(s);
+        }
+      }
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply(window)
+          .apply(ParDo.of(new AddArbitraryKey<T>()))
+          .apply(GroupByKey.<Integer, T>create())
+          .apply(ParDo.of(new RemoveArbitraryKey<T>()));
+    }
+  }
+
+  private String appendToTempFolder(String filename) {
+    return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
+  }
+
+  private String getBaseOutputFilename() {
+    return appendToTempFolder("baseoutput");
+  }
+
+  /**
+   * Test a WriteFiles transform with a PCollection of elements.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWrite() throws IOException {
+    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
+        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
+  }
+
+  /**
+   * Test that WriteFiles with an empty input still produces one shard.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testEmptyWrite() throws IOException {
+    runWrite(Collections.<String>emptyList(), IDENTITY_MAP, 
getBaseOutputFilename());
+    checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
+        Optional.of(1));
+  }
+
+  /**
+   * Test that WriteFiles with a configured number of shards produces the 
desired number of shards
+   * even when there are many elements.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testShardedWrite() throws IOException {
+    runShardedWrite(
+        Arrays.asList("one", "two", "three", "four", "five", "six"),
+        IDENTITY_MAP,
+        getBaseOutputFilename(),
+        Optional.of(1));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCustomShardedWrite() throws IOException {
+    // Flag to validate that the pipeline options are passed to the Sink
+    WriteOptions options = 
TestPipeline.testingPipelineOptions().as(WriteOptions.class);
+    options.setTestFlag("test_value");
+    Pipeline p = TestPipeline.create(options);
+
+    List<String> inputs = new ArrayList<>();
+    // Prepare timestamps for the elements.
+    List<Long> timestamps = new ArrayList<>();
+    for (long i = 0; i < 1000; i++) {
+      inputs.add(Integer.toString(3));
+      timestamps.add(i + 1);
+    }
+
+    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+    WriteFiles<String> write = WriteFiles.to(sink).withSharding(new 
LargestInt());
+    p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
+        .apply(IDENTITY_MAP)
+        .apply(write);
+
+    p.run();
+
+    checkFileContents(getBaseOutputFilename(), inputs, Optional.of(3));
+  }
+
+  /**
+   * Test that WriteFiles with a configured number of shards produces the 
desired number of shard
+   * even when there are too few elements.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExpandShardedWrite() throws IOException {
+    runShardedWrite(
+        Arrays.asList("one", "two", "three", "four", "five", "six"),
+        IDENTITY_MAP,
+        getBaseOutputFilename(),
+        Optional.of(20));
+  }
+
+  /**
+   * Test a WriteFiles transform with an empty PCollection.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithEmptyPCollection() throws IOException {
+    List<String> inputs = new ArrayList<>();
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
+  }
+
+  /**
+   * Test a WriteFiles with a windowed PCollection.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWindowed() throws IOException {
+    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
+        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+    runWrite(
+        inputs, new 
WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
+        getBaseOutputFilename());
+  }
+
+  /**
+   * Test a WriteFiles with sessions.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithSessions() throws IOException {
+    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
+        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+
+    runWrite(
+        inputs,
+        new WindowAndReshuffle<>(
+            Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
+        getBaseOutputFilename());
+  }
+
+  @Test
+  public void testBuildWrite() {
+    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+    WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
+    assertThat((SimpleSink) write.getSink(), is(sink));
+    PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding 
=
+        write.getSharding();
+
+    assertThat(write.getSharding(), is(nullValue()));
+    assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
+    assertThat(write.getNumShards().get(), equalTo(3));
+    assertThat(write.getSharding(), equalTo(originalSharding));
+
+    WriteFiles<String> write2 = write.withSharding(SHARDING_TRANSFORM);
+    assertThat((SimpleSink) write2.getSink(), is(sink));
+    assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
+    // original unchanged
+
+    WriteFiles<String> writeUnsharded = write2.withRunnerDeterminedSharding();
+    assertThat(writeUnsharded.getSharding(), nullValue());
+    assertThat(write.getSharding(), equalTo(originalSharding));
+  }
+
+  @Test
+  public void testDisplayData() {
+    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    WriteFiles<String> write = WriteFiles.to(sink);
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
+    assertThat(displayData, includesDisplayDataFor("sink", sink));
+  }
+
+  @Test
+  public void testShardedDisplayData() {
+    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    WriteFiles<String> write = WriteFiles.to(sink).withNumShards(1);
+    DisplayData displayData = DisplayData.from(write);
+    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
+    assertThat(displayData, includesDisplayDataFor("sink", sink));
+    assertThat(displayData, hasDisplayItem("numShards", "1"));
+  }
+
+  @Test
+  public void testCustomShardStrategyDisplayData() {
+    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    WriteFiles<String> write =
+        WriteFiles.to(sink)
+            .withSharding(
+                new PTransform<PCollection<String>, 
PCollectionView<Integer>>() {
+                  @Override
+                  public PCollectionView<Integer> expand(PCollection<String> 
input) {
+                    return null;
+                  }
+
+                  @Override
+                  public void populateDisplayData(DisplayData.Builder builder) 
{
+                    builder.add(DisplayData.item("spam", "ham"));
+                  }
+                });
+    DisplayData displayData = DisplayData.from(write);
+    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
+    assertThat(displayData, includesDisplayDataFor("sink", sink));
+    assertThat(displayData, hasDisplayItem("spam", "ham"));
+  }
+
+  /**
+   * Performs a WriteFiles transform and verifies the WriteFiles transform 
calls the appropriate
+   * methods on a test sink in the correct order, as well as verifies that the 
elements of a
+   * PCollection are written to the sink.
+   */
+  private static void runWrite(
+      List<String> inputs, PTransform<PCollection<String>, 
PCollection<String>> transform,
+      String baseName) throws IOException {
+    runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
+  }
+
+  /**
+   * Performs a WriteFiles transform with the desired number of shards. 
Verifies the WriteFiles
+   * transform calls the appropriate methods on a test sink in the correct 
order, as well as
+   * verifies that the elements of a PCollection are written to the sink. If 
numConfiguredShards
+   * is not null, also verifies that the output number of shards is correct.
+   */
+  private static void runShardedWrite(
+      List<String> inputs,
+      PTransform<PCollection<String>, PCollection<String>> transform,
+      String baseName,
+      Optional<Integer> numConfiguredShards) throws IOException {
+    // Flag to validate that the pipeline options are passed to the Sink
+    WriteOptions options = 
TestPipeline.testingPipelineOptions().as(WriteOptions.class);
+    options.setTestFlag("test_value");
+    Pipeline p = TestPipeline.create(options);
+
+    // Prepare timestamps for the elements.
+    List<Long> timestamps = new ArrayList<>();
+    for (long i = 0; i < inputs.size(); i++) {
+      timestamps.add(i + 1);
+    }
+
+    SimpleSink sink = new SimpleSink(baseName, "");
+    WriteFiles<String> write = WriteFiles.to(sink);
+    if (numConfiguredShards.isPresent()) {
+      write = write.withNumShards(numConfiguredShards.get());
+    }
+    p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
+        .apply(transform)
+        .apply(write);
+    p.run();
+
+    checkFileContents(baseName, inputs, numConfiguredShards);
+  }
+
+  static void checkFileContents(String baseName, List<String> inputs,
+                                Optional<Integer> numExpectedShards) throws 
IOException {
+    List<File> outputFiles = Lists.newArrayList();
+    final String pattern = baseName + "*";
+    for (String outputFileName : 
IOChannelUtils.getFactory(pattern).match(pattern)) {
+      outputFiles.add(new File(outputFileName));
+    }
+    if (numExpectedShards.isPresent()) {
+      assertEquals(numExpectedShards.get().intValue(), outputFiles.size());
+    }
+
+    List<String> actual = Lists.newArrayList();
+    for (File outputFile : outputFiles) {
+      try (BufferedReader reader = new BufferedReader(new 
FileReader(outputFile))) {
+        for (;;) {
+          String line = reader.readLine();
+          if (line == null) {
+            break;
+          }
+          if (!line.equals(SimpleWriter.HEADER) && 
!line.equals(SimpleWriter.FOOTER)) {
+            actual.add(line);
+          }
+        }
+      }
+    }
+    assertThat(actual, containsInAnyOrder(inputs.toArray()));
+  }
+
+  /**
+   * Options for test, exposed for PipelineOptionsFactory.
+   */
+  public interface WriteOptions extends TestPipelineOptions {
+    @Description("Test flag and value")
+    String getTestFlag();
+    void setTestFlag(String value);
+  }
+
+  /**
+   * Outputs the largest integer in a {@link PCollection} into a {@link 
PCollectionView}. The input
+   * {@link PCollection} must be convertible to integers via {@link 
Integer#valueOf(String)}
+   */
+  private static class LargestInt
+      extends PTransform<PCollection<String>, PCollectionView<Integer>> {
+    @Override
+    public PCollectionView<Integer> expand(PCollection<String> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<String, Integer>() {
+                    @ProcessElement
+                    public void toInteger(ProcessContext ctxt) {
+                      ctxt.output(Integer.valueOf(ctxt.element()));
+                    }
+                  }))
+          .apply(Top.<Integer>largest(1))
+          .apply(Flatten.<Integer>iterables())
+          .apply(View.<Integer>asSingleton());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
deleted file mode 100644
index 16d7f2a..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ /dev/null
@@ -1,705 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Optional;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import 
org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Top;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for the Write PTransform.
- */
-@RunWith(JUnit4.class)
-public class WriteTest {
-  @Rule public final TestPipeline p = TestPipeline.create();
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  // Static store that can be accessed within the writer
-  private static List<String> sinkContents = new ArrayList<>();
-  // Static count of output shards
-  private static AtomicInteger numShards = new AtomicInteger(0);
-  // Static counts of the number of records per shard.
-  private static List<Integer> recordsPerShard = new ArrayList<>();
-
-  @SuppressWarnings("unchecked") // covariant cast
-  private static final PTransform<PCollection<String>, PCollection<String>> 
IDENTITY_MAP =
-      (PTransform)
-          MapElements.via(
-              new SimpleFunction<String, String>() {
-                @Override
-                public String apply(String input) {
-                  return input;
-                }
-              });
-
-  private static final PTransform<PCollection<String>, 
PCollectionView<Integer>>
-      SHARDING_TRANSFORM =
-          new PTransform<PCollection<String>, PCollectionView<Integer>>() {
-            @Override
-            public PCollectionView<Integer> expand(PCollection<String> input) {
-              return null;
-            }
-          };
-
-  private static class WindowAndReshuffle<T> extends 
PTransform<PCollection<T>, PCollection<T>> {
-    private final Window<T> window;
-    public WindowAndReshuffle(Window<T> window) {
-      this.window = window;
-    }
-
-    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
-
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
-      }
-    }
-
-    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, 
Iterable<T>>, T> {
-
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        for (T s : c.element().getValue()) {
-          c.output(s);
-        }
-      }
-    }
-
-    @Override
-    public PCollection<T> expand(PCollection<T> input) {
-      return input
-          .apply(window)
-          .apply(ParDo.of(new AddArbitraryKey<T>()))
-          .apply(GroupByKey.<Integer, T>create())
-          .apply(ParDo.of(new RemoveArbitraryKey<T>()));
-    }
-  }
-
-  /**
-   * Test a Write transform with a PCollection of elements.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWrite() {
-    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
-        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(inputs, IDENTITY_MAP);
-  }
-
-  /**
-   * Test that Write with an empty input still produces one shard.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testEmptyWrite() {
-    runWrite(Collections.<String>emptyList(), IDENTITY_MAP);
-    // Note we did not request a sharded write, so runWrite will not validate 
the number of shards.
-    assertThat(numShards.intValue(), greaterThan(0));
-  }
-
-  /**
-   * Test that Write with a configured number of shards produces the desired 
number of shards even
-   * when there are many elements.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testShardedWrite() {
-    runShardedWrite(
-        Arrays.asList("one", "two", "three", "four", "five", "six"),
-        IDENTITY_MAP,
-        Optional.of(1));
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCustomShardedWrite() {
-    // Flag to validate that the pipeline options are passed to the Sink
-    WriteOptions options = 
TestPipeline.testingPipelineOptions().as(WriteOptions.class);
-    options.setTestFlag("test_value");
-    Pipeline p = TestPipeline.create(options);
-
-    // Clear the sink's contents.
-    sinkContents.clear();
-    // Reset the number of shards produced.
-    numShards.set(0);
-    // Reset the number of records in each shard.
-    recordsPerShard.clear();
-
-    List<String> inputs = new ArrayList<>();
-    // Prepare timestamps for the elements.
-    List<Long> timestamps = new ArrayList<>();
-    for (long i = 0; i < 1000; i++) {
-      inputs.add(Integer.toString(3));
-      timestamps.add(i + 1);
-    }
-
-    TestSink sink = new TestSink();
-    Write<String> write = Write.to(sink).withSharding(new LargestInt());
-    p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
-        .apply(IDENTITY_MAP)
-        .apply(write);
-
-    p.run();
-    assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));
-    assertTrue(sink.hasCorrectState());
-    // The PCollection has values all equal to three, which should be fed as 
the sharding strategy
-    assertEquals(3, numShards.intValue());
-    assertEquals(3, recordsPerShard.size());
-  }
-
-  /**
-   * Test that Write with a configured number of shards produces the desired 
number of shards even
-   * when there are too few elements.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testExpandShardedWrite() {
-    runShardedWrite(
-        Arrays.asList("one", "two", "three", "four", "five", "six"),
-        IDENTITY_MAP,
-        Optional.of(20));
-  }
-
-  /**
-   * Tests that a Write can balance many elements.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testShardedWriteBalanced() {
-    int numElements = 1000;
-    List<String> inputs = new ArrayList<>(numElements);
-    for (int i = 0; i < numElements; ++i) {
-      inputs.add(String.format("elt%04d", i));
-    }
-
-    int numShards = 10;
-    runShardedWrite(
-        inputs,
-        new WindowAndReshuffle<>(
-            Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
-        Optional.of(numShards));
-
-    // Check that both the min and max number of results per shard are close 
to the expected.
-    int min = Integer.MAX_VALUE;
-    int max = Integer.MIN_VALUE;
-    for (Integer i : recordsPerShard) {
-      min = Math.min(min, i);
-      max = Math.max(max, i);
-    }
-    double expected = numElements / (double) numShards;
-    assertThat((double) min, Matchers.greaterThanOrEqualTo(expected * 0.6));
-    assertThat((double) max, Matchers.lessThanOrEqualTo(expected * 1.4));
-  }
-
-  /**
-   * Test a Write transform with an empty PCollection.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteWithEmptyPCollection() {
-    List<String> inputs = new ArrayList<>();
-    runWrite(inputs, IDENTITY_MAP);
-  }
-
-  /**
-   * Test a Write with a windowed PCollection.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteWindowed() {
-    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
-        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(
-        inputs, new 
WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
-  }
-
-  /**
-   * Test a Write with sessions.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testWriteWithSessions() {
-    List<String> inputs = Arrays.asList("Critical canary", "Apprehensive 
eagle",
-        "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-
-    runWrite(
-        inputs,
-        new WindowAndReshuffle<>(
-            
Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
-  }
-
-  @Test
-  public void testBuildWrite() {
-    Sink<String> sink = new TestSink() {};
-    Write<String> write = Write.to(sink).withNumShards(3);
-    assertThat(write.getSink(), is(sink));
-    PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding 
=
-        write.getSharding();
-
-    assertThat(write.getSharding(), is(nullValue()));
-    assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
-    assertThat(write.getNumShards().get(), equalTo(3));
-    assertThat(write.getSharding(), equalTo(originalSharding));
-
-    Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
-    assertThat(write2.getSink(), is(sink));
-    assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
-    // original unchanged
-
-    Write<String> writeUnsharded = write2.withRunnerDeterminedSharding();
-    assertThat(writeUnsharded.getSharding(), nullValue());
-    assertThat(write.getSharding(), equalTo(originalSharding));
-  }
-
-  @Test
-  public void testDisplayData() {
-    TestSink sink = new TestSink() {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-    Write<String> write = Write.to(sink);
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
-    assertThat(displayData, includesDisplayDataFor("sink", sink));
-  }
-
-  @Test
-  public void testShardedDisplayData() {
-    TestSink sink = new TestSink() {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-    Write<String> write = Write.to(sink).withNumShards(1);
-    DisplayData displayData = DisplayData.from(write);
-    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
-    assertThat(displayData, includesDisplayDataFor("sink", sink));
-    assertThat(displayData, hasDisplayItem("numShards", "1"));
-  }
-
-  @Test
-  public void testCustomShardStrategyDisplayData() {
-    TestSink sink = new TestSink() {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-    Write<String> write =
-        Write.to(sink)
-            .withSharding(
-                new PTransform<PCollection<String>, 
PCollectionView<Integer>>() {
-                  @Override
-                  public PCollectionView<Integer> expand(PCollection<String> 
input) {
-                    return null;
-                  }
-
-                  @Override
-                  public void populateDisplayData(DisplayData.Builder builder) 
{
-                    builder.add(DisplayData.item("spam", "ham"));
-                  }
-                });
-    DisplayData displayData = DisplayData.from(write);
-    assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
-    assertThat(displayData, includesDisplayDataFor("sink", sink));
-    assertThat(displayData, hasDisplayItem("spam", "ham"));
-  }
-
-  /**
-   * Performs a Write transform and verifies the Write transform calls the 
appropriate methods on
-   * a test sink in the correct order, as well as verifies that the elements 
of a PCollection are
-   * written to the sink.
-   */
-  private static void runWrite(
-      List<String> inputs, PTransform<PCollection<String>, 
PCollection<String>> transform) {
-    runShardedWrite(inputs, transform, Optional.<Integer>absent());
-  }
-
-  /**
-   * Performs a Write transform with the desired number of shards. Verifies 
the Write transform
-   * calls the appropriate methods on a test sink in the correct order, as 
well as verifies that
-   * the elements of a PCollection are written to the sink. If 
numConfiguredShards is not null, also
-   * verifies that the output number of shards is correct.
-   */
-  private static void runShardedWrite(
-      List<String> inputs,
-      PTransform<PCollection<String>, PCollection<String>> transform,
-      Optional<Integer> numConfiguredShards) {
-    // Flag to validate that the pipeline options are passed to the Sink
-    WriteOptions options = 
TestPipeline.testingPipelineOptions().as(WriteOptions.class);
-    options.setTestFlag("test_value");
-    Pipeline p = TestPipeline.create(options);
-
-    // Clear the sink's contents.
-    sinkContents.clear();
-    // Reset the number of shards produced.
-    numShards.set(0);
-    // Reset the number of records in each shard.
-    recordsPerShard.clear();
-
-    // Prepare timestamps for the elements.
-    List<Long> timestamps = new ArrayList<>();
-    for (long i = 0; i < inputs.size(); i++) {
-      timestamps.add(i + 1);
-    }
-
-    TestSink sink = new TestSink();
-    Write<String> write = Write.to(sink);
-    if (numConfiguredShards.isPresent()) {
-      write = write.withNumShards(numConfiguredShards.get());
-    }
-    p.apply(Create.timestamped(inputs, 
timestamps).withCoder(StringUtf8Coder.of()))
-     .apply(transform)
-     .apply(write);
-
-    p.run();
-    assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));
-    assertTrue(sink.hasCorrectState());
-    if (numConfiguredShards.isPresent()) {
-      assertEquals(numConfiguredShards.get().intValue(), numShards.intValue());
-      assertEquals(numConfiguredShards.get().intValue(), 
recordsPerShard.size());
-    }
-  }
-
-  // Test sink and associated write operation and writer. TestSink, 
TestWriteOperation, and
-  // TestWriter each verify that the sequence of method calls is consistent 
with the specification
-  // of the Write PTransform.
-  private static class TestSink extends Sink<String> {
-    private boolean createCalled = false;
-    private boolean validateCalled = false;
-
-    @Override
-    public WriteOperation<String, ?> createWriteOperation(PipelineOptions 
options) {
-      assertTrue(validateCalled);
-      assertTestFlagPresent(options);
-      createCalled = true;
-      return new TestSinkWriteOperation(this);
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      assertTestFlagPresent(options);
-      validateCalled = true;
-    }
-
-    private void assertTestFlagPresent(PipelineOptions options) {
-      assertEquals("test_value", options.as(WriteOptions.class).getTestFlag());
-    }
-
-    private boolean hasCorrectState() {
-      return validateCalled && createCalled;
-    }
-
-    /**
-     * Implementation of equals() that indicates all test sinks are equal.
-     */
-    @Override
-    public boolean equals(Object other) {
-      return (other instanceof TestSink);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("createCalled", createCalled)
-          .add("validateCalled", validateCalled)
-          .toString();
-    }
-  }
-
-  private static class TestSinkWriteOperation extends WriteOperation<String, 
TestWriterResult> {
-    private enum State {
-      INITIAL,
-      INITIALIZED,
-      FINALIZED
-    }
-
-    // Must be static in case the WriteOperation is serialized before the its 
coder is obtained.
-    // If this occurs, the value will be modified but not reflected in the 
WriteOperation that is
-    // executed by the runner, and the finalize method will fail.
-    private static volatile boolean coderCalled = false;
-
-    private State state = State.INITIAL;
-
-    private final TestSink sink;
-    private final UUID id = UUID.randomUUID();
-
-    public TestSinkWriteOperation(TestSink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public TestSink getSink() {
-      return sink;
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {
-      assertEquals("test_value", options.as(WriteOptions.class).getTestFlag());
-      assertThat(state, anyOf(equalTo(State.INITIAL), 
equalTo(State.INITIALIZED)));
-      state = State.INITIALIZED;
-    }
-
-    @Override
-    public void setWindowedWrites(boolean windowedWrites) {
-    }
-
-    @Override
-    public void finalize(Iterable<TestWriterResult> bundleResults, 
PipelineOptions options)
-        throws Exception {
-      assertEquals("test_value", options.as(WriteOptions.class).getTestFlag());
-      assertEquals(State.INITIALIZED, state);
-      // The coder for the test writer results should've been called.
-      assertTrue(coderCalled);
-      Set<String> idSet = new HashSet<>();
-      int resultCount = 0;
-      state = State.FINALIZED;
-      for (TestWriterResult result : bundleResults) {
-        resultCount += 1;
-        idSet.add(result.uId);
-        // Add the elements that were written to the sink's contents.
-        sinkContents.addAll(result.elementsWritten);
-        recordsPerShard.add(result.elementsWritten.size());
-      }
-      // Each result came from a unique id.
-      assertEquals(resultCount, idSet.size());
-    }
-
-    @Override
-    public Writer<String, TestWriterResult> createWriter(PipelineOptions 
options) {
-      return new TestSinkWriter(this);
-    }
-
-    @Override
-    public Coder<TestWriterResult> getWriterResultCoder() {
-      coderCalled = true;
-      return SerializableCoder.of(TestWriterResult.class);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("id", id)
-          .add("sink", sink)
-          .add("state", state)
-          .add("coderCalled", coderCalled)
-          .toString();
-    }
-
-    /**
-     * Implementation of equals() that does not depend on the state of the 
write operation,
-     * but only its specification. In general, write operations will have 
interesting
-     * specifications, but for a {@link TestSinkWriteOperation}, it is not the 
case. Instead,
-     * a unique identifier (that is serialized along with it) is used to 
simulate such a
-     * specification.
-     */
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof TestSinkWriteOperation)) {
-        return false;
-      }
-      TestSinkWriteOperation otherOperation = (TestSinkWriteOperation) other;
-      return sink.equals(otherOperation.sink)
-          && id.equals(otherOperation.id);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(id, sink);
-    }
-  }
-
-  private static class TestWriterResult implements Serializable {
-    String uId;
-    List<String> elementsWritten;
-
-    public TestWriterResult(String uId, List<String> elementsWritten) {
-      this.uId = uId;
-      this.elementsWritten = elementsWritten;
-    }
-  }
-
-  private static class TestSinkWriter extends Writer<String, TestWriterResult> 
{
-    private enum State {
-      INITIAL,
-      OPENED,
-      WRITING,
-      CLOSED
-    }
-
-    private State state = State.INITIAL;
-    private List<String> elementsWritten = new ArrayList<>();
-    private String uId;
-
-    private final TestSinkWriteOperation writeOperation;
-
-    public TestSinkWriter(TestSinkWriteOperation writeOperation) {
-      this.writeOperation = writeOperation;
-    }
-
-    @Override
-    public TestSinkWriteOperation getWriteOperation() {
-      return writeOperation;
-    }
-
-    @Override
-    public final void openWindowed(String uId,
-                                   BoundedWindow window,
-                                   PaneInfo paneInfo,
-                                   int shard,
-                                   int nShards) throws Exception {
-      numShards.incrementAndGet();
-      this.uId = uId;
-      assertEquals(State.INITIAL, state);
-      state = State.OPENED;
-    }
-
-    @Override
-    public final void openUnwindowed(String uId,
-                                     int shard,
-                                     int nShards) throws Exception {
-      numShards.incrementAndGet();
-      this.uId = uId;
-      assertEquals(State.INITIAL, state);
-      state = State.OPENED;
-    }
-
-    @Override
-    public void write(String value) throws Exception {
-      assertThat(state, anyOf(equalTo(State.OPENED), equalTo(State.WRITING)));
-      state = State.WRITING;
-      elementsWritten.add(value);
-    }
-
-    @Override
-    public TestWriterResult close() throws Exception {
-      assertThat(state, anyOf(equalTo(State.OPENED), equalTo(State.WRITING)));
-      state = State.CLOSED;
-      return new TestWriterResult(uId, elementsWritten);
-    }
-
-    @Override
-    public void cleanup() throws Exception {
-    }
-  }
-
-
-  /**
-   * Options for test, exposed for PipelineOptionsFactory.
-   */
-  public interface WriteOptions extends TestPipelineOptions {
-    @Description("Test flag and value")
-    String getTestFlag();
-    void setTestFlag(String value);
-  }
-
-  /**
-   * Outputs the largest integer in a {@link PCollection} into a {@link 
PCollectionView}. The input
-   * {@link PCollection} must be convertible to integers via {@link 
Integer#valueOf(String)}
-   */
-  private static class LargestInt
-      extends PTransform<PCollection<String>, PCollectionView<Integer>> {
-    @Override
-    public PCollectionView<Integer> expand(PCollection<String> input) {
-      return input
-          .apply(
-              ParDo.of(
-                  new DoFn<String, Integer>() {
-                    @ProcessElement
-                    public void toInteger(ProcessContext ctxt) {
-                      ctxt.output(Integer.valueOf(ctxt.element()));
-                    }
-                  }))
-          .apply(Top.<Integer>largest(1))
-          .apply(Flatten.<Integer>iterables())
-          .apply(View.<Integer>asSingleton());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 2d55005..07b6b4a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -143,7 +143,7 @@ public class TransformTreeTest {
               assertTrue(visited.add(TransformsSeen.SAMPLE));
               assertNotNull(node.getEnclosingNode());
               assertTrue(node.isCompositeNode());
-            } else if (transform instanceof Write) {
+            } else if (transform instanceof WriteFiles) {
               assertTrue(visited.add(TransformsSeen.WRITE));
               assertNotNull(node.getEnclosingNode());
               assertTrue(node.isCompositeNode());
@@ -165,7 +165,7 @@ public class TransformTreeTest {
             PTransform<?, ?> transform = node.getTransform();
             // Pick is a composite, should not be visited here.
             assertThat(transform, not(instanceOf(Combine.Globally.class)));
-            assertThat(transform, not(instanceOf(Write.class)));
+            assertThat(transform, not(instanceOf(WriteFiles.class)));
             if (transform instanceof Read.Bounded
                 && node.getEnclosingNode().getTransform() instanceof 
TextIO.Read.Bound) {
               assertTrue(visited.add(TransformsSeen.READ));

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index 15d61cb..aa9e41e 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -66,7 +65,8 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 /**
- * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop 
file-based output
+ * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop 
file-based
+ * output
  * format.
  *
  * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of 
type T to Hadoop
@@ -75,7 +75,7 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  * value class V and finally the {@link SerializableFunction} to map from T to 
{@link KV} of K
  * and V.
  *
- * <p>{@code HDFSFileSink} can be used by {@link org.apache.beam.sdk.io.Write} 
to create write
+ * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
  * transform. See example below.
  *
  * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache 
Avro. For example:

Reply via email to