http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java new file mode 100644 index 0000000..4f45cc9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; +import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; +import org.apache.beam.sdk.io.FileBasedSink.FileResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that writes to a {@link FileBasedSink}. A write begins with a sequential + * global initialization of a sink, followed by a parallel write, and ends with a sequential + * finalization of the write. The output of a write is {@link PDone}. + * + * <p>By default, every bundle in the input {@link PCollection} will be processed by a + * {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation}, so the number of output + * will vary based on runner behavior, though at least 1 output will always be produced. The + * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards}, + * typically used to control how many files are produced or to globally limit the number of + * workers connecting to an external service. However, this option can often hurt performance: it + * adds an additional {@link GroupByKey} to the pipeline. + * + * <p>Example usage with runner-determined sharding: + * + * <pre>{@code p.apply(WriteFiles.to(new MySink(...)));}</pre> + * + * <p>Example usage with a fixed number of shards: + * + * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre> + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { + private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); + + private static final int UNKNOWN_SHARDNUM = -1; + private static final int UNKNOWN_NUMSHARDS = -1; + + private FileBasedSink<T> sink; + private FileBasedWriteOperation<T> writeOperation; + // This allows the number of shards to be dynamically computed based on the input + // PCollection. + @Nullable + private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; + // We don't use a side input for static sharding, as we want this value to be updatable + // when a pipeline is updated. + @Nullable + private final ValueProvider<Integer> numShardsProvider; + private boolean windowedWrites; + + /** + * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting + * the runner control how many different shards are produced. + */ + public static <T> WriteFiles<T> to(FileBasedSink<T> sink) { + checkNotNull(sink, "sink"); + return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false); + } + + private WriteFiles( + FileBasedSink<T> sink, + @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, + @Nullable ValueProvider<Integer> numShardsProvider, + boolean windowedWrites) { + this.sink = sink; + this.computeNumShards = computeNumShards; + this.numShardsProvider = numShardsProvider; + this.windowedWrites = windowedWrites; + } + + @Override + public PDone expand(PCollection<T> input) { + checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, + "%s can only be applied to an unbounded PCollection if doing windowed writes", + WriteFiles.class.getSimpleName()); + PipelineOptions options = input.getPipeline().getOptions(); + sink.validate(options); + this.writeOperation = sink.createWriteOperation(options); + this.writeOperation.setWindowedWrites(windowedWrites); + return createWrite(input); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink")) + .include("sink", sink); + if (getSharding() != null) { + builder.include("sharding", getSharding()); + } else if (getNumShards() != null) { + String numShards = getNumShards().isAccessible() + ? getNumShards().get().toString() : getNumShards().toString(); + builder.add(DisplayData.item("numShards", numShards) + .withLabel("Fixed Number of Shards")); + } + } + + /** + * Returns the {@link FileBasedSink} associated with this PTransform. + */ + public FileBasedSink<T> getSink() { + return sink; + } + + /** + * Gets the {@link PTransform} that will be used to determine sharding. This can be either a + * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by + * {@link #withSharding(PTransform)}), or runner-determined (by {@link + * #withRunnerDeterminedSharding()}. + */ + @Nullable + public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { + return computeNumShards; + } + + public ValueProvider<Integer> getNumShards() { + return numShardsProvider; + } + + /** + * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the + * specified number of shards. + * + * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for + * more information. + * + * <p>A value less than or equal to 0 will be equivalent to the default behavior of + * runner-determined sharding. + */ + public WriteFiles<T> withNumShards(int numShards) { + if (numShards > 0) { + return withNumShards(StaticValueProvider.of(numShards)); + } + return withRunnerDeterminedSharding(); + } + + /** + * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the + * {@link ValueProvider} specified number of shards. + * + * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for + * more information. + */ + public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) { + return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites); + } + + /** + * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the + * specified {@link PTransform} to compute the number of shards. + * + * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for + * more information. + */ + public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { + checkNotNull( + sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); + return new WriteFiles<>(sink, sharding, null, windowedWrites); + } + + /** + * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with + * runner-determined sharding. + */ + public WriteFiles<T> withRunnerDeterminedSharding() { + return new WriteFiles<>(sink, null, null, windowedWrites); + } + + /** + * Returns a new {@link WriteFiles} that writes preserves windowing on it's input. + * + * <p>If this option is not specified, windowing and triggering are replaced by + * {@link GlobalWindows} and {@link DefaultTrigger}. + * + * <p>If there is no data for a window, no output shards will be generated for that window. + * If a window triggers multiple times, then more than a single output shard might be + * generated multiple times; it's up to the sink implementation to keep these output shards + * unique. + * + * <p>This option can only be used if {@link #withNumShards(int)} is also set to a + * positive value. + */ + public WriteFiles<T> withWindowedWrites() { + return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true); + } + + /** + * Writes all the elements in a bundle using a {@link FileBasedWriter} produced by the + * {@link FileBasedSink.FileBasedWriteOperation} associated with the {@link FileBasedSink}. + */ + private class WriteBundles extends DoFn<T, FileResult> { + // Writer that will write the records in this bundle. Lazily + // initialized in processElement. + private FileBasedWriter<T> writer = null; + + WriteBundles() { + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + // Lazily initialize the Writer + if (writer == null) { + LOG.info("Opening writer for write operation {}", writeOperation); + writer = writeOperation.createWriter(c.getPipelineOptions()); + + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } + LOG.debug("Done opening writer {} for operation {}", writer, writeOperation); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (writer != null) { + FileResult result = writer.close(); + c.output(result); + // Reset state in case of reuse. + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(WriteFiles.this); + } + } + + /** + * Like {@link WriteBundles}, but where the elements for each shard have been collected into + * a single iterable. + * + * @see WriteBundles + */ + private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> { + private final PCollectionView<Integer> numShardsView; + + WriteShardedBundles(PCollectionView<Integer> numShardsView) { + this.numShardsView = numShardsView; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); + // In a sharded write, single input element represents one shard. We can open and close + // the writer in each call to processElement. + LOG.info("Opening writer for write operation {}", writeOperation); + FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions()); + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), + numShards); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } + LOG.debug("Done opening writer {} for operation {}", writer, writeOperation); + + try { + try { + for (T t : c.element().getValue()) { + writer.write(t); + } + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + + // Close the writer; if this throws let the error propagate. + FileResult result = writer.close(); + c.output(result); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(WriteFiles.this); + } + } + + private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { + private final PCollectionView<Integer> numShardsView; + private final ValueProvider<Integer> numShardsProvider; + private int shardNumber; + + ApplyShardingKey(PCollectionView<Integer> numShardsView, + ValueProvider<Integer> numShardsProvider) { + this.numShardsView = numShardsView; + this.numShardsProvider = numShardsProvider; + shardNumber = UNKNOWN_SHARDNUM; + } + + @ProcessElement + public void processElement(ProcessContext context) { + int shardCount = 0; + if (numShardsView != null) { + shardCount = context.sideInput(numShardsView); + } else { + checkNotNull(numShardsProvider); + shardCount = numShardsProvider.get(); + } + checkArgument( + shardCount > 0, + "Must have a positive number of shards specified for non-runner-determined sharding." + + " Got %s", + shardCount); + if (shardNumber == UNKNOWN_SHARDNUM) { + // We want to desynchronize the first record sharding key for each instance of + // ApplyShardingKey, so records in a small PCollection will be statistically balanced. + shardNumber = ThreadLocalRandom.current().nextInt(shardCount); + } else { + shardNumber = (shardNumber + 1) % shardCount; + } + context.output(KV.of(shardNumber, context.element())); + } + } + + /** + * A write is performed as sequence of three {@link ParDo}'s. + * + * <p>This singleton collection containing the FileBasedWriteOperation is then used as a side + * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase, + * {@link FileBasedWriteOperation#createWriter} is called to obtain a {@link FileBasedWriter}. + * {@link FileBasedWriter#open} and {@link FileBasedWriter#close} are called in + * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and + * {@link FileBasedWriter#write} method is called for every element in the bundle. The output + * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink} + * for a description of writer results)-one for each bundle. + * + * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer + * results as a side-input. In this ParDo, {@link FileBasedWriteOperation#finalize} is called + * to finalize the write. + * + * <p>If the write of any element in the PCollection fails, {@link FileBasedWriter#close} will be + * called before the exception that caused the write to fail is propagated and the write result + * will be discarded. + * + * <p>Since the {@link FileBasedWriteOperation} is serialized after the initialization ParDo and + * deserialized in the bundle-writing and finalization phases, any state change to the + * FileBasedWriteOperation object that occurs during initialization is visible in the latter + * phases. However, the FileBasedWriteOperation is not serialized after the bundle-writing + * phase. This is why implementations should guarantee that + * {@link FileBasedWriteOperation#createWriter} does not mutate FileBasedWriteOperation). + */ + private PDone createWrite(PCollection<T> input) { + Pipeline p = input.getPipeline(); + + if (!windowedWrites) { + // Re-window the data into the global window and remove any existing triggers. + input = + input.apply( + Window.<T>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + } + + + // Perform the per-bundle writes as a ParDo on the input PCollection (with the + // FileBasedWriteOperation as a side input) and collect the results of the writes in a + // PCollection. There is a dependency between this ParDo and the first (the + // FileBasedWriteOperation PCollection as a side input), so this will happen after the + // initial ParDo. + PCollection<FileResult> results; + final PCollectionView<Integer> numShardsView; + if (computeNumShards == null && numShardsProvider == null) { + if (windowedWrites) { + throw new IllegalStateException("When doing windowed writes, numShards must be set" + + "explicitly to a positive value"); + } + numShardsView = null; + results = input + .apply("WriteBundles", + ParDo.of(new WriteBundles())); + } else { + if (computeNumShards != null) { + numShardsView = input.apply(computeNumShards); + results = input + .apply("ApplyShardLabel", ParDo.of( + new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView)) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles(numShardsView)) + .withSideInputs(numShardsView)); + } else { + numShardsView = null; + results = input + .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider))) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles(null))); + } + } + results.setCoder(writeOperation.getFileResultCoder()); + + if (windowedWrites) { + // When processing streaming windowed writes, results will arrive multiple times. This + // means we can't share the below implementation that turns the results into a side input, + // as new data arriving into a side input does not trigger the listening DoFn. Instead + // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered + // whenever new data arrives. + PCollection<KV<Void, FileResult>> keyedResults = + results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null)); + keyedResults.setCoder(KvCoder.of(VoidCoder.of(), writeOperation.getFileResultCoder())); + + // Is the continuation trigger sufficient? + keyedResults + .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create()) + .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info("Finalizing write operation {}.", writeOperation); + List<FileResult> results = Lists.newArrayList(c.element().getValue()); + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + })); + } else { + final PCollectionView<Iterable<FileResult>> resultsView = + results.apply(View.<FileResult>asIterable()); + ImmutableList.Builder<PCollectionView<?>> sideInputs = + ImmutableList.<PCollectionView<?>>builder().add(resultsView); + if (numShardsView != null) { + sideInputs.add(numShardsView); + } + + // Finalize the write in another do-once ParDo on the singleton collection containing the + // Writer. The results from the per-bundle writes are given as an Iterable side input. + // The FileBasedWriteOperation's state is the same as after its initialization in the first + // do-once ParDo. There is a dependency between this ParDo and the parallel write (the writer + // results collection as a side input), so it will happen after the parallel write. + // For the non-windowed case, we guarantee that if no data is written but the user has + // set numShards, then all shards will be written out as empty files. For this reason we + // use a side input here. + PCollection<Void> singletonCollection = p.apply(Create.of((Void) null)); + singletonCollection + .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info("Finalizing write operation {}.", writeOperation); + List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards if + // set. + int minShardsNeeded; + if (numShardsView != null) { + minShardsNeeded = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + minShardsNeeded = numShardsProvider.get(); + } else { + minShardsNeeded = 1; + } + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written for a total of " + + " {}.", extraShardsNeeded, results.size(), minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + FileResult emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); + } + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }).withSideInputs(sideInputs.build())); + } + return PDone.in(input.getPipeline()); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java index a53fb86..6ced5d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java @@ -35,8 +35,8 @@ import org.apache.beam.sdk.values.PDone; public class XmlIO { // CHECKSTYLE.OFF: JavadocStyle /** - * Reads XML files. This source reads one or more XML files and - * creates a {@link PCollection} of a given type. Please note the example given below. + * Reads XML files. This source reads one or more XML files and creates a {@link PCollection} of a + * given type. Please note the example given below. * * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML * element names that are defined by the user: @@ -88,7 +88,8 @@ public class XmlIO { * Apache Beam. * * <h3>Permissions</h3> - * Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner + * + * <p>Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of * corresponding {@link PipelineRunner PipelineRunners} for more details. * @@ -105,8 +106,8 @@ public class XmlIO { // CHECKSTYLE.OFF: JavadocStyle /** - * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of - * records from JAXB-annotated classes to a single file location. + * A {@link FileBasedSink} that outputs records as XML-formatted elements. Writes a {@link + * PCollection} of records from JAXB-annotated classes to a single file location. * * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, * this Sink will produce a single file consisting of a single root element that contains all of @@ -268,6 +269,7 @@ public class XmlIO { /** * Determine if a given filename matches a compression type based on its extension. + * * @param filename the filename to match * @return true iff the filename ends with the compression type's known extension. */ @@ -277,8 +279,8 @@ public class XmlIO { } /** - * Reads a single XML file or a set of XML files defined by a Java "glob" - * file pattern. Each XML file should be of the form defined in {@link #read}. + * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML + * file should be of the form defined in {@link #read}. */ public Read<T> from(String fileOrPatternSpec) { return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build(); @@ -322,9 +324,9 @@ public class XmlIO { /** * Decompresses all input files using the specified compression type. * - * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. - * In this mode, the compression type of the file is determined by its extension. - * Supports .gz, .bz2, .zip and .deflate compression. + * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. In this + * mode, the compression type of the file is determined by its extension. Supports .gz, .bz2, + * .zip and .deflate compression. */ public Read<T> withCompressionType(CompressionType compressionType) { return toBuilder().setCompressionType(compressionType).build(); @@ -415,7 +417,6 @@ public class XmlIO { abstract Write<T> build(); } - /** * Writes to files with the given path prefix. * @@ -435,9 +436,7 @@ public class XmlIO { return toBuilder().setRecordClass(recordClass).build(); } - /** - * Sets the enclosing root element for the generated XML files. - */ + /** Sets the enclosing root element for the generated XML files. */ public Write<T> withRootElement(String rootElement) { return toBuilder().setRootElement(rootElement).build(); } @@ -456,7 +455,7 @@ public class XmlIO { @Override public PDone expand(PCollection<T> input) { - return input.apply(org.apache.beam.sdk.io.Write.to(createSink())); + return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); } @VisibleForTesting @@ -468,10 +467,10 @@ public class XmlIO { public void populateDisplayData(DisplayData.Builder builder) { createSink().populateFileBasedDisplayData(builder); builder - .addIfNotNull(DisplayData.item("rootElement", getRootElement()) - .withLabel("XML Root Element")) - .addIfNotNull(DisplayData.item("recordClass", getRecordClass()) - .withLabel("XML Record Class")); + .addIfNotNull( + DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) + .addIfNotNull( + DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java index 7700329..b890908 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java @@ -65,7 +65,7 @@ class XmlSink<T> extends FileBasedSink<T> { } /** - * {@link Sink.WriteOperation} for XML {@link Sink}s. + * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link FileBasedSink}s. */ protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> { public XmlWriteOperation(XmlSink<T> sink) { @@ -97,7 +97,7 @@ class XmlSink<T> extends FileBasedSink<T> { } /** - * A {@link Sink.Writer} that can write objects as XML elements. + * A {@link FileBasedWriter} that can write objects as XML elements. */ protected static final class XmlWriter<T> extends FileBasedWriter<T> { final Marshaller marshaller; http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java index eb5db20..1e5c4dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java @@ -20,11 +20,12 @@ package org.apache.beam.sdk.values; import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.transforms.PTransform; /** * {@link PDone} is the output of a {@link PTransform} that has a trivial result, - * such as a {@link org.apache.beam.sdk.io.Write}. + * such as a {@link WriteFiles}. */ public class PDone extends POutputValueBase { http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 5b81ba8..65fb8ba 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -533,79 +533,6 @@ public class FileBasedSinkTest { } /** - * A simple FileBasedSink that writes String values as lines with header and footer lines. - */ - private static final class SimpleSink extends FileBasedSink<String> { - public SimpleSink(String baseOutputFilename, String extension) { - super(baseOutputFilename, extension); - } - - public SimpleSink(String baseOutputFilename, String extension, - WritableByteChannelFactory writableByteChannelFactory) { - super(baseOutputFilename, extension, writableByteChannelFactory); - } - - public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) { - super(baseOutputFilename, extension, fileNamingTemplate); - } - - @Override - public SimpleWriteOperation createWriteOperation(PipelineOptions options) { - return new SimpleWriteOperation(this); - } - - private static final class SimpleWriteOperation extends FileBasedWriteOperation<String> { - public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) { - super(sink, tempOutputFilename); - } - - public SimpleWriteOperation(SimpleSink sink) { - super(sink); - } - - @Override - public SimpleWriter createWriter(PipelineOptions options) throws Exception { - return new SimpleWriter(this); - } - } - - private static final class SimpleWriter extends FileBasedWriter<String> { - static final String HEADER = "header"; - static final String FOOTER = "footer"; - - private WritableByteChannel channel; - - public SimpleWriter(SimpleWriteOperation writeOperation) { - super(writeOperation); - } - - private static ByteBuffer wrap(String value) throws Exception { - return ByteBuffer.wrap((value + "\n").getBytes("UTF-8")); - } - - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - this.channel = channel; - } - - @Override - protected void writeHeader() throws Exception { - channel.write(wrap(HEADER)); - } - - @Override - protected void writeFooter() throws Exception { - channel.write(wrap(FOOTER)); - } - - @Override - public void write(String value) throws Exception { - channel.write(wrap(value)); - } - } - } - - /** * Build a SimpleSink with default options. */ private SimpleSink buildSink() { http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java new file mode 100644 index 0000000..e3cd9b6 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * A simple FileBasedSink that writes String values as lines with header and footer lines. + */ +class SimpleSink extends FileBasedSink<String> { + public SimpleSink(String baseOutputFilename, String extension) { + super(baseOutputFilename, extension); + } + + public SimpleSink(String baseOutputFilename, String extension, + WritableByteChannelFactory writableByteChannelFactory) { + super(baseOutputFilename, extension, writableByteChannelFactory); + } + + public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) { + super(baseOutputFilename, extension, fileNamingTemplate); + } + + @Override + public SimpleWriteOperation createWriteOperation(PipelineOptions options) { + return new SimpleWriteOperation(this); + } + + static final class SimpleWriteOperation extends FileBasedWriteOperation<String> { + public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) { + super(sink, tempOutputFilename); + } + + public SimpleWriteOperation(SimpleSink sink) { + super(sink); + } + + @Override + public SimpleWriter createWriter(PipelineOptions options) throws Exception { + return new SimpleWriter(this); + } + } + + static final class SimpleWriter extends FileBasedWriter<String> { + static final String HEADER = "header"; + static final String FOOTER = "footer"; + + private WritableByteChannel channel; + + public SimpleWriter(SimpleWriteOperation writeOperation) { + super(writeOperation); + } + + private static ByteBuffer wrap(String value) throws Exception { + return ByteBuffer.wrap((value + "\n").getBytes("UTF-8")); + } + + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + this.channel = channel; + } + + @Override + protected void writeHeader() throws Exception { + channel.write(wrap(HEADER)); + } + + @Override + protected void writeFooter() throws Exception { + channel.write(wrap(FOOTER)); + } + + @Override + public void write(String value) throws Exception { + channel.write(wrap(value)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java new file mode 100644 index 0000000..ea0395d --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.SimpleSink.SimpleWriter; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Top; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for the WriteFiles PTransform. + */ +@RunWith(JUnit4.class) +public class WriteFilesTest { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public final TestPipeline p = TestPipeline.create(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @SuppressWarnings("unchecked") // covariant cast + private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = + (PTransform) + MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return input; + } + }); + + private static final PTransform<PCollection<String>, PCollectionView<Integer>> + SHARDING_TRANSFORM = + new PTransform<PCollection<String>, PCollectionView<Integer>>() { + @Override + public PCollectionView<Integer> expand(PCollection<String> input) { + return null; + } + }; + + private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> { + private final Window<T> window; + + public WindowAndReshuffle(Window<T> window) { + this.window = window; + } + + private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { + + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); + } + } + + private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { + + @ProcessElement + public void processElement(ProcessContext c) { + for (T s : c.element().getValue()) { + c.output(s); + } + } + } + + @Override + public PCollection<T> expand(PCollection<T> input) { + return input + .apply(window) + .apply(ParDo.of(new AddArbitraryKey<T>())) + .apply(GroupByKey.<Integer, T>create()) + .apply(ParDo.of(new RemoveArbitraryKey<T>())); + } + } + + private String appendToTempFolder(String filename) { + return Paths.get(tmpFolder.getRoot().getPath(), filename).toString(); + } + + private String getBaseOutputFilename() { + return appendToTempFolder("baseoutput"); + } + + /** + * Test a WriteFiles transform with a PCollection of elements. + */ + @Test + @Category(NeedsRunner.class) + public void testWrite() throws IOException { + List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", + "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename()); + } + + /** + * Test that WriteFiles with an empty input still produces one shard. + */ + @Test + @Category(NeedsRunner.class) + public void testEmptyWrite() throws IOException { + runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename()); + checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), + Optional.of(1)); + } + + /** + * Test that WriteFiles with a configured number of shards produces the desired number of shards + * even when there are many elements. + */ + @Test + @Category(NeedsRunner.class) + public void testShardedWrite() throws IOException { + runShardedWrite( + Arrays.asList("one", "two", "three", "four", "five", "six"), + IDENTITY_MAP, + getBaseOutputFilename(), + Optional.of(1)); + } + + @Test + @Category(NeedsRunner.class) + public void testCustomShardedWrite() throws IOException { + // Flag to validate that the pipeline options are passed to the Sink + WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); + options.setTestFlag("test_value"); + Pipeline p = TestPipeline.create(options); + + List<String> inputs = new ArrayList<>(); + // Prepare timestamps for the elements. + List<Long> timestamps = new ArrayList<>(); + for (long i = 0; i < 1000; i++) { + inputs.add(Integer.toString(3)); + timestamps.add(i + 1); + } + + SimpleSink sink = new SimpleSink(getBaseOutputFilename(), ""); + WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt()); + p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) + .apply(IDENTITY_MAP) + .apply(write); + + p.run(); + + checkFileContents(getBaseOutputFilename(), inputs, Optional.of(3)); + } + + /** + * Test that WriteFiles with a configured number of shards produces the desired number of shard + * even when there are too few elements. + */ + @Test + @Category(NeedsRunner.class) + public void testExpandShardedWrite() throws IOException { + runShardedWrite( + Arrays.asList("one", "two", "three", "four", "five", "six"), + IDENTITY_MAP, + getBaseOutputFilename(), + Optional.of(20)); + } + + /** + * Test a WriteFiles transform with an empty PCollection. + */ + @Test + @Category(NeedsRunner.class) + public void testWriteWithEmptyPCollection() throws IOException { + List<String> inputs = new ArrayList<>(); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename()); + } + + /** + * Test a WriteFiles with a windowed PCollection. + */ + @Test + @Category(NeedsRunner.class) + public void testWriteWindowed() throws IOException { + List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", + "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + runWrite( + inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), + getBaseOutputFilename()); + } + + /** + * Test a WriteFiles with sessions. + */ + @Test + @Category(NeedsRunner.class) + public void testWriteWithSessions() throws IOException { + List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", + "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + + runWrite( + inputs, + new WindowAndReshuffle<>( + Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), + getBaseOutputFilename()); + } + + @Test + public void testBuildWrite() { + SimpleSink sink = new SimpleSink(getBaseOutputFilename(), ""); + WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3); + assertThat((SimpleSink) write.getSink(), is(sink)); + PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = + write.getSharding(); + + assertThat(write.getSharding(), is(nullValue())); + assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class)); + assertThat(write.getNumShards().get(), equalTo(3)); + assertThat(write.getSharding(), equalTo(originalSharding)); + + WriteFiles<String> write2 = write.withSharding(SHARDING_TRANSFORM); + assertThat((SimpleSink) write2.getSink(), is(sink)); + assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM)); + // original unchanged + + WriteFiles<String> writeUnsharded = write2.withRunnerDeterminedSharding(); + assertThat(writeUnsharded.getSharding(), nullValue()); + assertThat(write.getSharding(), equalTo(originalSharding)); + } + + @Test + public void testDisplayData() { + SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String> write = WriteFiles.to(sink); + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("sink", sink.getClass())); + assertThat(displayData, includesDisplayDataFor("sink", sink)); + } + + @Test + public void testShardedDisplayData() { + SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String> write = WriteFiles.to(sink).withNumShards(1); + DisplayData displayData = DisplayData.from(write); + assertThat(displayData, hasDisplayItem("sink", sink.getClass())); + assertThat(displayData, includesDisplayDataFor("sink", sink)); + assertThat(displayData, hasDisplayItem("numShards", "1")); + } + + @Test + public void testCustomShardStrategyDisplayData() { + SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + WriteFiles<String> write = + WriteFiles.to(sink) + .withSharding( + new PTransform<PCollection<String>, PCollectionView<Integer>>() { + @Override + public PCollectionView<Integer> expand(PCollection<String> input) { + return null; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("spam", "ham")); + } + }); + DisplayData displayData = DisplayData.from(write); + assertThat(displayData, hasDisplayItem("sink", sink.getClass())); + assertThat(displayData, includesDisplayDataFor("sink", sink)); + assertThat(displayData, hasDisplayItem("spam", "ham")); + } + + /** + * Performs a WriteFiles transform and verifies the WriteFiles transform calls the appropriate + * methods on a test sink in the correct order, as well as verifies that the elements of a + * PCollection are written to the sink. + */ + private static void runWrite( + List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, + String baseName) throws IOException { + runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent()); + } + + /** + * Performs a WriteFiles transform with the desired number of shards. Verifies the WriteFiles + * transform calls the appropriate methods on a test sink in the correct order, as well as + * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards + * is not null, also verifies that the output number of shards is correct. + */ + private static void runShardedWrite( + List<String> inputs, + PTransform<PCollection<String>, PCollection<String>> transform, + String baseName, + Optional<Integer> numConfiguredShards) throws IOException { + // Flag to validate that the pipeline options are passed to the Sink + WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); + options.setTestFlag("test_value"); + Pipeline p = TestPipeline.create(options); + + // Prepare timestamps for the elements. + List<Long> timestamps = new ArrayList<>(); + for (long i = 0; i < inputs.size(); i++) { + timestamps.add(i + 1); + } + + SimpleSink sink = new SimpleSink(baseName, ""); + WriteFiles<String> write = WriteFiles.to(sink); + if (numConfiguredShards.isPresent()) { + write = write.withNumShards(numConfiguredShards.get()); + } + p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) + .apply(transform) + .apply(write); + p.run(); + + checkFileContents(baseName, inputs, numConfiguredShards); + } + + static void checkFileContents(String baseName, List<String> inputs, + Optional<Integer> numExpectedShards) throws IOException { + List<File> outputFiles = Lists.newArrayList(); + final String pattern = baseName + "*"; + for (String outputFileName : IOChannelUtils.getFactory(pattern).match(pattern)) { + outputFiles.add(new File(outputFileName)); + } + if (numExpectedShards.isPresent()) { + assertEquals(numExpectedShards.get().intValue(), outputFiles.size()); + } + + List<String> actual = Lists.newArrayList(); + for (File outputFile : outputFiles) { + try (BufferedReader reader = new BufferedReader(new FileReader(outputFile))) { + for (;;) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (!line.equals(SimpleWriter.HEADER) && !line.equals(SimpleWriter.FOOTER)) { + actual.add(line); + } + } + } + } + assertThat(actual, containsInAnyOrder(inputs.toArray())); + } + + /** + * Options for test, exposed for PipelineOptionsFactory. + */ + public interface WriteOptions extends TestPipelineOptions { + @Description("Test flag and value") + String getTestFlag(); + void setTestFlag(String value); + } + + /** + * Outputs the largest integer in a {@link PCollection} into a {@link PCollectionView}. The input + * {@link PCollection} must be convertible to integers via {@link Integer#valueOf(String)} + */ + private static class LargestInt + extends PTransform<PCollection<String>, PCollectionView<Integer>> { + @Override + public PCollectionView<Integer> expand(PCollection<String> input) { + return input + .apply( + ParDo.of( + new DoFn<String, Integer>() { + @ProcessElement + public void toInteger(ProcessContext ctxt) { + ctxt.output(Integer.valueOf(ctxt.element())); + } + })) + .apply(Top.<Integer>largest(1)) + .apply(Flatten.<Integer>iterables()) + .apply(View.<Integer>asSingleton()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java deleted file mode 100644 index 16d7f2a..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ /dev/null @@ -1,705 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Optional; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Sink.WriteOperation; -import org.apache.beam.sdk.io.Sink.Writer; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Top; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for the Write PTransform. - */ -@RunWith(JUnit4.class) -public class WriteTest { - @Rule public final TestPipeline p = TestPipeline.create(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - // Static store that can be accessed within the writer - private static List<String> sinkContents = new ArrayList<>(); - // Static count of output shards - private static AtomicInteger numShards = new AtomicInteger(0); - // Static counts of the number of records per shard. - private static List<Integer> recordsPerShard = new ArrayList<>(); - - @SuppressWarnings("unchecked") // covariant cast - private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = - (PTransform) - MapElements.via( - new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return input; - } - }); - - private static final PTransform<PCollection<String>, PCollectionView<Integer>> - SHARDING_TRANSFORM = - new PTransform<PCollection<String>, PCollectionView<Integer>>() { - @Override - public PCollectionView<Integer> expand(PCollection<String> input) { - return null; - } - }; - - private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final Window<T> window; - public WindowAndReshuffle(Window<T> window) { - this.window = window; - } - - private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { - - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); - } - } - - private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { - - @ProcessElement - public void processElement(ProcessContext c) { - for (T s : c.element().getValue()) { - c.output(s); - } - } - } - - @Override - public PCollection<T> expand(PCollection<T> input) { - return input - .apply(window) - .apply(ParDo.of(new AddArbitraryKey<T>())) - .apply(GroupByKey.<Integer, T>create()) - .apply(ParDo.of(new RemoveArbitraryKey<T>())); - } - } - - /** - * Test a Write transform with a PCollection of elements. - */ - @Test - @Category(NeedsRunner.class) - public void testWrite() { - List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", - "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite(inputs, IDENTITY_MAP); - } - - /** - * Test that Write with an empty input still produces one shard. - */ - @Test - @Category(NeedsRunner.class) - public void testEmptyWrite() { - runWrite(Collections.<String>emptyList(), IDENTITY_MAP); - // Note we did not request a sharded write, so runWrite will not validate the number of shards. - assertThat(numShards.intValue(), greaterThan(0)); - } - - /** - * Test that Write with a configured number of shards produces the desired number of shards even - * when there are many elements. - */ - @Test - @Category(NeedsRunner.class) - public void testShardedWrite() { - runShardedWrite( - Arrays.asList("one", "two", "three", "four", "five", "six"), - IDENTITY_MAP, - Optional.of(1)); - } - - @Test - @Category(NeedsRunner.class) - public void testCustomShardedWrite() { - // Flag to validate that the pipeline options are passed to the Sink - WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); - options.setTestFlag("test_value"); - Pipeline p = TestPipeline.create(options); - - // Clear the sink's contents. - sinkContents.clear(); - // Reset the number of shards produced. - numShards.set(0); - // Reset the number of records in each shard. - recordsPerShard.clear(); - - List<String> inputs = new ArrayList<>(); - // Prepare timestamps for the elements. - List<Long> timestamps = new ArrayList<>(); - for (long i = 0; i < 1000; i++) { - inputs.add(Integer.toString(3)); - timestamps.add(i + 1); - } - - TestSink sink = new TestSink(); - Write<String> write = Write.to(sink).withSharding(new LargestInt()); - p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) - .apply(IDENTITY_MAP) - .apply(write); - - p.run(); - assertThat(sinkContents, containsInAnyOrder(inputs.toArray())); - assertTrue(sink.hasCorrectState()); - // The PCollection has values all equal to three, which should be fed as the sharding strategy - assertEquals(3, numShards.intValue()); - assertEquals(3, recordsPerShard.size()); - } - - /** - * Test that Write with a configured number of shards produces the desired number of shards even - * when there are too few elements. - */ - @Test - @Category(NeedsRunner.class) - public void testExpandShardedWrite() { - runShardedWrite( - Arrays.asList("one", "two", "three", "four", "five", "six"), - IDENTITY_MAP, - Optional.of(20)); - } - - /** - * Tests that a Write can balance many elements. - */ - @Test - @Category(NeedsRunner.class) - public void testShardedWriteBalanced() { - int numElements = 1000; - List<String> inputs = new ArrayList<>(numElements); - for (int i = 0; i < numElements; ++i) { - inputs.add(String.format("elt%04d", i)); - } - - int numShards = 10; - runShardedWrite( - inputs, - new WindowAndReshuffle<>( - Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), - Optional.of(numShards)); - - // Check that both the min and max number of results per shard are close to the expected. - int min = Integer.MAX_VALUE; - int max = Integer.MIN_VALUE; - for (Integer i : recordsPerShard) { - min = Math.min(min, i); - max = Math.max(max, i); - } - double expected = numElements / (double) numShards; - assertThat((double) min, Matchers.greaterThanOrEqualTo(expected * 0.6)); - assertThat((double) max, Matchers.lessThanOrEqualTo(expected * 1.4)); - } - - /** - * Test a Write transform with an empty PCollection. - */ - @Test - @Category(NeedsRunner.class) - public void testWriteWithEmptyPCollection() { - List<String> inputs = new ArrayList<>(); - runWrite(inputs, IDENTITY_MAP); - } - - /** - * Test a Write with a windowed PCollection. - */ - @Test - @Category(NeedsRunner.class) - public void testWriteWindowed() { - List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", - "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite( - inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2))))); - } - - /** - * Test a Write with sessions. - */ - @Test - @Category(NeedsRunner.class) - public void testWriteWithSessions() { - List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", - "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - - runWrite( - inputs, - new WindowAndReshuffle<>( - Window.<String>into(Sessions.withGapDuration(Duration.millis(1))))); - } - - @Test - public void testBuildWrite() { - Sink<String> sink = new TestSink() {}; - Write<String> write = Write.to(sink).withNumShards(3); - assertThat(write.getSink(), is(sink)); - PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding = - write.getSharding(); - - assertThat(write.getSharding(), is(nullValue())); - assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class)); - assertThat(write.getNumShards().get(), equalTo(3)); - assertThat(write.getSharding(), equalTo(originalSharding)); - - Write<String> write2 = write.withSharding(SHARDING_TRANSFORM); - assertThat(write2.getSink(), is(sink)); - assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM)); - // original unchanged - - Write<String> writeUnsharded = write2.withRunnerDeterminedSharding(); - assertThat(writeUnsharded.getSharding(), nullValue()); - assertThat(write.getSharding(), equalTo(originalSharding)); - } - - @Test - public void testDisplayData() { - TestSink sink = new TestSink() { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - Write<String> write = Write.to(sink); - DisplayData displayData = DisplayData.from(write); - - assertThat(displayData, hasDisplayItem("sink", sink.getClass())); - assertThat(displayData, includesDisplayDataFor("sink", sink)); - } - - @Test - public void testShardedDisplayData() { - TestSink sink = new TestSink() { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - Write<String> write = Write.to(sink).withNumShards(1); - DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("sink", sink.getClass())); - assertThat(displayData, includesDisplayDataFor("sink", sink)); - assertThat(displayData, hasDisplayItem("numShards", "1")); - } - - @Test - public void testCustomShardStrategyDisplayData() { - TestSink sink = new TestSink() { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - Write<String> write = - Write.to(sink) - .withSharding( - new PTransform<PCollection<String>, PCollectionView<Integer>>() { - @Override - public PCollectionView<Integer> expand(PCollection<String> input) { - return null; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("spam", "ham")); - } - }); - DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("sink", sink.getClass())); - assertThat(displayData, includesDisplayDataFor("sink", sink)); - assertThat(displayData, hasDisplayItem("spam", "ham")); - } - - /** - * Performs a Write transform and verifies the Write transform calls the appropriate methods on - * a test sink in the correct order, as well as verifies that the elements of a PCollection are - * written to the sink. - */ - private static void runWrite( - List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) { - runShardedWrite(inputs, transform, Optional.<Integer>absent()); - } - - /** - * Performs a Write transform with the desired number of shards. Verifies the Write transform - * calls the appropriate methods on a test sink in the correct order, as well as verifies that - * the elements of a PCollection are written to the sink. If numConfiguredShards is not null, also - * verifies that the output number of shards is correct. - */ - private static void runShardedWrite( - List<String> inputs, - PTransform<PCollection<String>, PCollection<String>> transform, - Optional<Integer> numConfiguredShards) { - // Flag to validate that the pipeline options are passed to the Sink - WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); - options.setTestFlag("test_value"); - Pipeline p = TestPipeline.create(options); - - // Clear the sink's contents. - sinkContents.clear(); - // Reset the number of shards produced. - numShards.set(0); - // Reset the number of records in each shard. - recordsPerShard.clear(); - - // Prepare timestamps for the elements. - List<Long> timestamps = new ArrayList<>(); - for (long i = 0; i < inputs.size(); i++) { - timestamps.add(i + 1); - } - - TestSink sink = new TestSink(); - Write<String> write = Write.to(sink); - if (numConfiguredShards.isPresent()) { - write = write.withNumShards(numConfiguredShards.get()); - } - p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) - .apply(transform) - .apply(write); - - p.run(); - assertThat(sinkContents, containsInAnyOrder(inputs.toArray())); - assertTrue(sink.hasCorrectState()); - if (numConfiguredShards.isPresent()) { - assertEquals(numConfiguredShards.get().intValue(), numShards.intValue()); - assertEquals(numConfiguredShards.get().intValue(), recordsPerShard.size()); - } - } - - // Test sink and associated write operation and writer. TestSink, TestWriteOperation, and - // TestWriter each verify that the sequence of method calls is consistent with the specification - // of the Write PTransform. - private static class TestSink extends Sink<String> { - private boolean createCalled = false; - private boolean validateCalled = false; - - @Override - public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) { - assertTrue(validateCalled); - assertTestFlagPresent(options); - createCalled = true; - return new TestSinkWriteOperation(this); - } - - @Override - public void validate(PipelineOptions options) { - assertTestFlagPresent(options); - validateCalled = true; - } - - private void assertTestFlagPresent(PipelineOptions options) { - assertEquals("test_value", options.as(WriteOptions.class).getTestFlag()); - } - - private boolean hasCorrectState() { - return validateCalled && createCalled; - } - - /** - * Implementation of equals() that indicates all test sinks are equal. - */ - @Override - public boolean equals(Object other) { - return (other instanceof TestSink); - } - - @Override - public int hashCode() { - return Objects.hash(getClass()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("createCalled", createCalled) - .add("validateCalled", validateCalled) - .toString(); - } - } - - private static class TestSinkWriteOperation extends WriteOperation<String, TestWriterResult> { - private enum State { - INITIAL, - INITIALIZED, - FINALIZED - } - - // Must be static in case the WriteOperation is serialized before the its coder is obtained. - // If this occurs, the value will be modified but not reflected in the WriteOperation that is - // executed by the runner, and the finalize method will fail. - private static volatile boolean coderCalled = false; - - private State state = State.INITIAL; - - private final TestSink sink; - private final UUID id = UUID.randomUUID(); - - public TestSinkWriteOperation(TestSink sink) { - this.sink = sink; - } - - @Override - public TestSink getSink() { - return sink; - } - - @Override - public void initialize(PipelineOptions options) throws Exception { - assertEquals("test_value", options.as(WriteOptions.class).getTestFlag()); - assertThat(state, anyOf(equalTo(State.INITIAL), equalTo(State.INITIALIZED))); - state = State.INITIALIZED; - } - - @Override - public void setWindowedWrites(boolean windowedWrites) { - } - - @Override - public void finalize(Iterable<TestWriterResult> bundleResults, PipelineOptions options) - throws Exception { - assertEquals("test_value", options.as(WriteOptions.class).getTestFlag()); - assertEquals(State.INITIALIZED, state); - // The coder for the test writer results should've been called. - assertTrue(coderCalled); - Set<String> idSet = new HashSet<>(); - int resultCount = 0; - state = State.FINALIZED; - for (TestWriterResult result : bundleResults) { - resultCount += 1; - idSet.add(result.uId); - // Add the elements that were written to the sink's contents. - sinkContents.addAll(result.elementsWritten); - recordsPerShard.add(result.elementsWritten.size()); - } - // Each result came from a unique id. - assertEquals(resultCount, idSet.size()); - } - - @Override - public Writer<String, TestWriterResult> createWriter(PipelineOptions options) { - return new TestSinkWriter(this); - } - - @Override - public Coder<TestWriterResult> getWriterResultCoder() { - coderCalled = true; - return SerializableCoder.of(TestWriterResult.class); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("id", id) - .add("sink", sink) - .add("state", state) - .add("coderCalled", coderCalled) - .toString(); - } - - /** - * Implementation of equals() that does not depend on the state of the write operation, - * but only its specification. In general, write operations will have interesting - * specifications, but for a {@link TestSinkWriteOperation}, it is not the case. Instead, - * a unique identifier (that is serialized along with it) is used to simulate such a - * specification. - */ - @Override - public boolean equals(Object other) { - if (!(other instanceof TestSinkWriteOperation)) { - return false; - } - TestSinkWriteOperation otherOperation = (TestSinkWriteOperation) other; - return sink.equals(otherOperation.sink) - && id.equals(otherOperation.id); - } - - @Override - public int hashCode() { - return Objects.hash(id, sink); - } - } - - private static class TestWriterResult implements Serializable { - String uId; - List<String> elementsWritten; - - public TestWriterResult(String uId, List<String> elementsWritten) { - this.uId = uId; - this.elementsWritten = elementsWritten; - } - } - - private static class TestSinkWriter extends Writer<String, TestWriterResult> { - private enum State { - INITIAL, - OPENED, - WRITING, - CLOSED - } - - private State state = State.INITIAL; - private List<String> elementsWritten = new ArrayList<>(); - private String uId; - - private final TestSinkWriteOperation writeOperation; - - public TestSinkWriter(TestSinkWriteOperation writeOperation) { - this.writeOperation = writeOperation; - } - - @Override - public TestSinkWriteOperation getWriteOperation() { - return writeOperation; - } - - @Override - public final void openWindowed(String uId, - BoundedWindow window, - PaneInfo paneInfo, - int shard, - int nShards) throws Exception { - numShards.incrementAndGet(); - this.uId = uId; - assertEquals(State.INITIAL, state); - state = State.OPENED; - } - - @Override - public final void openUnwindowed(String uId, - int shard, - int nShards) throws Exception { - numShards.incrementAndGet(); - this.uId = uId; - assertEquals(State.INITIAL, state); - state = State.OPENED; - } - - @Override - public void write(String value) throws Exception { - assertThat(state, anyOf(equalTo(State.OPENED), equalTo(State.WRITING))); - state = State.WRITING; - elementsWritten.add(value); - } - - @Override - public TestWriterResult close() throws Exception { - assertThat(state, anyOf(equalTo(State.OPENED), equalTo(State.WRITING))); - state = State.CLOSED; - return new TestWriterResult(uId, elementsWritten); - } - - @Override - public void cleanup() throws Exception { - } - } - - - /** - * Options for test, exposed for PipelineOptionsFactory. - */ - public interface WriteOptions extends TestPipelineOptions { - @Description("Test flag and value") - String getTestFlag(); - void setTestFlag(String value); - } - - /** - * Outputs the largest integer in a {@link PCollection} into a {@link PCollectionView}. The input - * {@link PCollection} must be convertible to integers via {@link Integer#valueOf(String)} - */ - private static class LargestInt - extends PTransform<PCollection<String>, PCollectionView<Integer>> { - @Override - public PCollectionView<Integer> expand(PCollection<String> input) { - return input - .apply( - ParDo.of( - new DoFn<String, Integer>() { - @ProcessElement - public void toInteger(ProcessContext ctxt) { - ctxt.output(Integer.valueOf(ctxt.element())); - } - })) - .apply(Top.<Integer>largest(1)) - .apply(Flatten.<Integer>iterables()) - .apply(View.<Integer>asSingleton()); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index 2d55005..07b6b4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -143,7 +143,7 @@ public class TransformTreeTest { assertTrue(visited.add(TransformsSeen.SAMPLE)); assertNotNull(node.getEnclosingNode()); assertTrue(node.isCompositeNode()); - } else if (transform instanceof Write) { + } else if (transform instanceof WriteFiles) { assertTrue(visited.add(TransformsSeen.WRITE)); assertNotNull(node.getEnclosingNode()); assertTrue(node.isCompositeNode()); @@ -165,7 +165,7 @@ public class TransformTreeTest { PTransform<?, ?> transform = node.getTransform(); // Pick is a composite, should not be visited here. assertThat(transform, not(instanceOf(Combine.Globally.class))); - assertThat(transform, not(instanceOf(Write.class))); + assertThat(transform, not(instanceOf(WriteFiles.class))); if (transform instanceof Read.Bounded && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) { assertTrue(visited.add(TransformsSeen.READ)); http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java index 15d61cb..aa9e41e 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -66,7 +65,8 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; /** - * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output + * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based + * output * format. * * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop @@ -75,7 +75,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K * and V. * - * <p>{@code HDFSFileSink} can be used by {@link org.apache.beam.sdk.io.Write} to create write + * <p>{@code HDFSFileSink} can be used by {@link Write} to create write * transform. See example below. * * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example: