Repository: beam Updated Branches: refs/heads/master 889776fca -> 5f972e8b2
Unbundle Context and WindowedContext. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64997efa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64997efa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64997efa Branch: refs/heads/master Commit: 64997efa597a6fd74f4a6b6a7ab48d663c56845f Parents: 91c7d3d Author: Reuven Lax <re...@google.com> Authored: Mon Jul 10 21:30:50 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Jul 13 09:29:23 2017 -0700 ---------------------------------------------------------------------- .../examples/common/WriteOneFilePerWindow.java | 19 +- .../complete/game/utils/WriteToText.java | 18 +- .../construction/WriteFilesTranslationTest.java | 12 +- .../beam/sdk/io/DefaultFilenamePolicy.java | 47 ++-- .../org/apache/beam/sdk/io/FileBasedSink.java | 198 ++++---------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 263 ++++++++++--------- .../apache/beam/sdk/io/FileBasedSinkTest.java | 88 +++---- .../org/apache/beam/sdk/io/WriteFilesTest.java | 122 ++++----- 8 files changed, 358 insertions(+), 409 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index 49865ba..abd14b7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -28,7 +28,9 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.joda.time.format.DateTimeFormatter; @@ -88,14 +90,18 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), + filenamePrefixForWindow(intervalWindow), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix()); return baseFilename .getCurrentDirectory() @@ -103,7 +109,8 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java index 1d60198..6b7c928 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.joda.time.DateTimeZone; @@ -143,20 +144,25 @@ public class WriteToText<InputT> } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); + public ResourceId windowedFilename(int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), + filenamePrefixForWindow(intervalWindow), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix()); return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index 283df16..4259ac8 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.junit.Test; @@ -163,13 +165,19 @@ public class WriteFilesTranslationTest { private static class DummyFilenamePolicy extends FilenamePolicy { @Override - public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) { + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Should never be called."); } @Nullable @Override - public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Should never be called."); } http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 7a60e49..64d7edc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -52,19 +52,19 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; * with the number of shards, index of the particular file, current window and pane information, * using {@link #constructName}. * - * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced - * uses in generating different files for each window and other sharding controls, see the - * {@code WriteOneFilePerWindow} example pipeline. + * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced uses in generating + * different files for each window and other sharding controls, see the {@code + * WriteOneFilePerWindow} example pipeline. */ public final class DefaultFilenamePolicy extends FilenamePolicy { /** The default sharding name template. */ public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; - /** The default windowed sharding name template used when writing windowed files. - * This is used as default in cases when user did not specify shard template to - * be used and there is a need to write windowed files. In cases when user does - * specify shard template to be used then provided template will be used for both - * windowed and non-windowed file names. + /** + * The default windowed sharding name template used when writing windowed files. This is used as + * default in cases when user did not specify shard template to be used and there is a need to + * write windowed files. In cases when user does specify shard template to be used then provided + * template will be used for both windowed and non-windowed file names. */ private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE = "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE; @@ -190,11 +190,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { * <p>This is a shortcut for: * * <pre>{@code - * DefaultFilenamePolicy.fromParams(new Params() - * .withBaseFilename(baseFilename) - * .withShardTemplate(shardTemplate) - * .withSuffix(filenameSuffix) - * .withWindowedWrites()) + * DefaultFilenamePolicy.fromParams(new Params() + * .withBaseFilename(baseFilename) + * .withShardTemplate(shardTemplate) + * .withSuffix(filenameSuffix) + * .withWindowedWrites()) * }</pre> * * <p>Where the respective {@code with} methods are invoked only if the value is non-null or true. @@ -284,28 +284,33 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { @Override @Nullable - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { return constructName( params.baseFilename.get(), params.shardTemplate, params.suffix + outputFileHints.getSuggestedFilenameSuffix(), - context.getShardNumber(), - context.getNumShards(), + shardNumber, + numShards, null, null); } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - final PaneInfo paneInfo = context.getPaneInfo(); + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { String paneStr = paneInfoToString(paneInfo); - String windowStr = windowToString(context.getWindow()); + String windowStr = windowToString(window); return constructName( params.baseFilename.get(), params.shardTemplate, params.suffix + outputFileHints.getSuggestedFilenameSuffix(), - context.getShardNumber(), - context.getNumShards(), + shardNumber, + numShards, paneStr, windowStr); } http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 583af60..c68b794 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -58,8 +58,6 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; @@ -96,9 +94,9 @@ import org.slf4j.LoggerFactory; * <p>The process of writing to file-based sink is as follows: * * <ol> - * <li>An optional subclass-defined initialization, - * <li>a parallel write of bundles to temporary files, and finally, - * <li>these temporary files are renamed with final output filenames. + * <li>An optional subclass-defined initialization, + * <li>a parallel write of bundles to temporary files, and finally, + * <li>these temporary files are renamed with final output filenames. * </ol> * * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the @@ -125,46 +123,36 @@ import org.slf4j.LoggerFactory; public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class); - /** - * Directly supported file output compression types. - */ + /** Directly supported file output compression types. */ public enum CompressionType implements WritableByteChannelFactory { - /** - * No compression, or any other transformation, will be used. - */ + /** No compression, or any other transformation, will be used. */ UNCOMPRESSED("", null) { @Override public WritableByteChannel create(WritableByteChannel channel) throws IOException { return channel; } }, - /** - * Provides GZip output transformation. - */ + /** Provides GZip output transformation. */ GZIP(".gz", MimeTypes.BINARY) { @Override public WritableByteChannel create(WritableByteChannel channel) throws IOException { return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true)); } }, - /** - * Provides BZip2 output transformation. - */ + /** Provides BZip2 output transformation. */ BZIP2(".bz2", MimeTypes.BINARY) { @Override public WritableByteChannel create(WritableByteChannel channel) throws IOException { - return Channels - .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel))); + return Channels.newChannel( + new BZip2CompressorOutputStream(Channels.newOutputStream(channel))); } }, - /** - * Provides deflate output transformation. - */ + /** Provides deflate output transformation. */ DEFLATE(".deflate", MimeTypes.BINARY) { @Override public WritableByteChannel create(WritableByteChannel channel) throws IOException { - return Channels - .newChannel(new DeflateCompressorOutputStream(Channels.newOutputStream(channel))); + return Channels.newChannel( + new DeflateCompressorOutputStream(Channels.newOutputStream(channel))); } }; @@ -182,7 +170,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } @Override - @Nullable public String getMimeType() { + @Nullable + public String getMimeType() { return mimeType; } } @@ -213,8 +202,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab /** * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the - * underlying channel. The default is to not compress the output using - * {@link CompressionType#UNCOMPRESSED}. + * underlying channel. The default is to not compress the output using {@link + * CompressionType#UNCOMPRESSED}. */ private final WritableByteChannelFactory writableByteChannelFactory; @@ -285,85 +274,20 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab @Experimental(Kind.FILESYSTEM) public abstract static class FilenamePolicy implements Serializable { /** - * Context used for generating a name based on shard number, and num shards. - * The policy must produce unique filenames for unique {@link Context} objects. - * - * <p>Be careful about adding fields to this as existing strategies will not notice the new - * fields, and may not produce unique filenames. - */ - public static class Context { - private int shardNumber; - private int numShards; - - - public Context(int shardNumber, int numShards) { - this.shardNumber = shardNumber; - this.numShards = numShards; - } - - public int getShardNumber() { - return shardNumber; - } - - - public int getNumShards() { - return numShards; - } - } - - /** - * Context used for generating a name based on window, pane, shard number, and num shards. - * The policy must produce unique filenames for unique {@link WindowedContext} objects. - * - * <p>Be careful about adding fields to this as existing strategies will not notice the new - * fields, and may not produce unique filenames. - */ - public static class WindowedContext { - private int shardNumber; - private int numShards; - private BoundedWindow window; - private PaneInfo paneInfo; - - public WindowedContext( - BoundedWindow window, - PaneInfo paneInfo, - int shardNumber, - int numShards) { - this.window = window; - this.paneInfo = paneInfo; - this.shardNumber = shardNumber; - this.numShards = numShards; - } - - public BoundedWindow getWindow() { - return window; - } - - public PaneInfo getPaneInfo() { - return paneInfo; - } - - public int getShardNumber() { - return shardNumber; - } - - public int getNumShards() { - return numShards; - } - } - - /** * When a sink has requested windowed or triggered output, this method will be invoked to return * the file {@link ResourceId resource} to be created given the base output directory and a * {@link OutputFileHints} containing information about the file, including a suggested * extension (e.g. coming from {@link CompressionType}). * - * <p>The {@link WindowedContext} object gives access to the window and pane, as well as - * sharding information. The policy must return unique and consistent filenames for different - * windows and panes. + * <p>The policy must return unique and consistent filenames for different windows and panes. */ @Experimental(Kind.FILESYSTEM) - public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints); + public abstract ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints); /** * When a sink has not requested windowed or triggered output, this method will be invoked to @@ -371,18 +295,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * a {@link OutputFileHints} containing information about the file, including a suggested (e.g. * coming from {@link CompressionType}). * - * <p>The {@link Context} object only provides sharding information, which is used by the policy - * to generate unique and consistent filenames. + * <p>The shardNumber and numShards parameters, should be used by the policy to generate unique + * and consistent filenames. */ @Experimental(Kind.FILESYSTEM) @Nullable - public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints); + public abstract ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints); - /** - * Populates the display data. - */ - public void populateDisplayData(DisplayData.Builder builder) { - } + /** Populates the display data. */ + public void populateDisplayData(DisplayData.Builder builder) {} } /** The directory to which files will be written. */ @@ -449,11 +371,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * written, * * <ol> - * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the - * output bundles. - * <li>During finalize, these temporary files are copied to final output locations and named - * according to a file naming template. - * <li>Finally, any temporary files that were created during the write are removed. + * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the + * output bundles. + * <li>During finalize, these temporary files are copied to final output locations and named + * according to a file naming template. + * <li>Finally, any temporary files that were created during the write are removed. * </ol> * * <p>Subclass implementations of WriteOperation must implement {@link @@ -558,9 +480,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab */ public abstract Writer<OutputT, DestinationT> createWriter() throws Exception; - /** - * Indicates that the operation will be performing windowed writes. - */ + /** Indicates that the operation will be performing windowed writes. */ public void setWindowedWrites(boolean windowedWrites) { this.windowedWrites = windowedWrites; } @@ -659,9 +579,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab } int numDistinctShards = new HashSet<>(outputFilenames.values()).size(); - checkState(numDistinctShards == outputFilenames.size(), - "Only generated %s distinct file names for %s files.", - numDistinctShards, outputFilenames.size()); + checkState( + numDistinctShards == outputFilenames.size(), + "Only generated %s distinct file names for %s files.", + numDistinctShards, + outputFilenames.size()); return outputFilenames; } @@ -726,8 +648,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab // ignore the exception for now to avoid failing the pipeline. if (shouldRemoveTemporaryDirectory) { try { - MatchResult singleMatch = Iterables.getOnlyElement( - FileSystems.match(Collections.singletonList(tempDir.toString() + "*"))); + MatchResult singleMatch = + Iterables.getOnlyElement( + FileSystems.match(Collections.singletonList(tempDir.toString() + "*"))); for (Metadata matchResult : singleMatch.metadata()) { matches.add(matchResult.resourceId()); } @@ -807,18 +730,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab /** The output file for this bundle. May be null if opening failed. */ private @Nullable ResourceId outputFile; - /** - * The channel to write to. - */ + /** The channel to write to. */ private WritableByteChannel channel; /** * The MIME type used in the creation of the output channel (if the file system supports it). * - * <p>This is the default for the sink, but it may be overridden by a supplied - * {@link WritableByteChannelFactory}. For example, {@link TextIO.Write} uses - * {@link MimeTypes#TEXT} by default but if {@link CompressionType#BZIP2} is set then - * the MIME type will be overridden to {@link MimeTypes#BINARY}. + * <p>This is the default for the sink, but it may be overridden by a supplied {@link + * WritableByteChannelFactory}. For example, {@link TextIO.Write} uses {@link MimeTypes#TEXT} by + * default but if {@link CompressionType#BZIP2} is set then the MIME type will be overridden to + * {@link MimeTypes#BINARY}. */ private final String mimeType; @@ -843,14 +764,12 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab */ protected void writeHeader() throws Exception {} - /** - * Writes footer at the end of output files. Nothing by default; subclasses may override. - */ + /** Writes footer at the end of output files. Nothing by default; subclasses may override. */ protected void writeFooter() throws Exception {} /** - * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}. - * If any resources opened in the write processes need to be flushed, flush them here. + * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}. If + * any resources opened in the write processes need to be flushed, flush them here. */ protected void finishWrite() throws Exception {} @@ -875,9 +794,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab open(uId, window, paneInfo, shard, destination); } - /** - * Called for each value in the bundle. - */ + /** Called for each value in the bundle. */ public abstract void write(OutputT value) throws Exception; /** @@ -982,7 +899,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab checkState( channel.isOpen(), - "Channel %s to %s should only be closed by its owner: %s", channel, outputFile); + "Channel %s to %s should only be closed by its owner: %s", + channel, + outputFile); LOG.debug("Closing channel to {}.", outputFile); try { @@ -1063,10 +982,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination); if (getWindow() != null) { return policy.windowedFilename( - new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards), - outputFileHints); + getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints); } else { - return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints); + return policy.unwindowedFilename(getShard(), numShards, outputFileHints); } } @@ -1154,7 +1072,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab * * @see MimeTypes * @see <a href= - * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a> + * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a> */ @Nullable String getMimeType(); http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 260e47a..4a1386c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -68,8 +68,10 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; @@ -84,20 +86,15 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for AvroIO Read and Write transforms. - */ +/** Tests for AvroIO Read and Write transforms. */ @RunWith(JUnit4.class) public class AvroIOTest { - @Rule - public TestPipeline p = TestPipeline.create(); + @Rule public TestPipeline p = TestPipeline.create(); - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); + @Rule public ExpectedException expectedException = ExpectedException.none(); @Test public void testAvroIOGetName() { @@ -109,11 +106,14 @@ public class AvroIOTest { static class GenericClass { int intField; String stringField; + public GenericClass() {} + public GenericClass(int intValue, String stringValue) { this.intField = intValue; this.stringField = stringValue; } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) @@ -121,10 +121,12 @@ public class AvroIOTest { .add("stringField", stringField) .toString(); } + @Override public int hashCode() { return Objects.hash(intField, stringField); } + @Override public boolean equals(Object other) { if (other == null || !(other instanceof GenericClass)) { @@ -138,20 +140,16 @@ public class AvroIOTest { @Test @Category(NeedsRunner.class) public void testAvroIOWriteAndReadASingleFile() throws Throwable { - List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), - new GenericClass(5, "bar")); + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class) - .to(outputFile.getAbsolutePath()) - .withoutSharding()); + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); p.run(); PCollection<GenericClass> input = - p.apply( - AvroIO.read(GenericClass.class) - .from(outputFile.getAbsolutePath())); + p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(values); p.run(); @@ -161,25 +159,25 @@ public class AvroIOTest { @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable { - List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), - new GenericClass(5, "bar")); + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class) - .to(outputFile.getAbsolutePath()) - .withoutSharding() - .withCodec(CodecFactory.deflateCodec(9))); + .apply( + AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) + .withoutSharding() + .withCodec(CodecFactory.deflateCodec(9))); p.run(); - PCollection<GenericClass> input = p - .apply(AvroIO.read(GenericClass.class) - .from(outputFile.getAbsolutePath())); + PCollection<GenericClass> input = + p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(values); p.run(); - DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), - new GenericDatumReader()); + DataFileStream dataFileStream = + new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); assertEquals("deflate", dataFileStream.getMetaString("avro.codec")); } @@ -187,25 +185,25 @@ public class AvroIOTest { @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable { - List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), - new GenericClass(5, "bar")); + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class) - .to(outputFile.getAbsolutePath()) - .withoutSharding() - .withCodec(CodecFactory.nullCodec())); + .apply( + AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) + .withoutSharding() + .withCodec(CodecFactory.nullCodec())); p.run(); - PCollection<GenericClass> input = p - .apply(AvroIO.read(GenericClass.class) - .from(outputFile.getAbsolutePath())); + PCollection<GenericClass> input = + p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(values); p.run(); - DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), - new GenericDatumReader()); + DataFileStream dataFileStream = + new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); assertEquals("null", dataFileStream.getMetaString("avro.codec")); } @@ -214,12 +212,15 @@ public class AvroIOTest { int intField; String stringField; @Nullable String nullableField; + public GenericClassV2() {} + public GenericClassV2(int intValue, String stringValue, String nullableValue) { this.intField = intValue; this.stringField = stringValue; this.nullableField = nullableValue; } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) @@ -228,10 +229,12 @@ public class AvroIOTest { .add("nullableField", nullableField) .toString(); } + @Override public int hashCode() { return Objects.hash(intField, stringField, nullableField); } + @Override public boolean equals(Object other) { if (other == null || !(other instanceof GenericClassV2)) { @@ -245,32 +248,28 @@ public class AvroIOTest { } /** - * Tests that {@code AvroIO} can read an upgraded version of an old class, as long as the - * schema resolution process succeeds. This test covers the case when a new, {@code @Nullable} - * field has been added. + * Tests that {@code AvroIO} can read an upgraded version of an old class, as long as the schema + * resolution process succeeds. This test covers the case when a new, {@code @Nullable} field has + * been added. * * <p>For more information, see http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution */ @Test @Category(NeedsRunner.class) public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { - List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), - new GenericClass(5, "bar")); + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class) - .to(outputFile.getAbsolutePath()) - .withoutSharding()); + .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); p.run(); - List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), - new GenericClassV2(5, "bar", null)); + List<GenericClassV2> expected = + ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); PCollection<GenericClassV2> input = - p.apply( - AvroIO.read(GenericClassV2.class) - .from(outputFile.getAbsolutePath())); + p.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(expected); p.run(); @@ -284,7 +283,12 @@ public class AvroIOTest { } @Override - public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) { + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { String filenamePrefix = outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), ""); @@ -292,11 +296,11 @@ public class AvroIOTest { String.format( "%s-%s-%s-of-%s-pane-%s%s%s", filenamePrefix, - input.getWindow(), - input.getShardNumber(), - input.getNumShards() - 1, - input.getPaneInfo().getIndex(), - input.getPaneInfo().isLast() ? "-final" : "", + window, + shardNumber, + numShards - 1, + paneInfo.getIndex(), + paneInfo.isLast() ? "-final" : "", outputFileHints.getSuggestedFilenameSuffix()); return outputFilePrefix .getCurrentDirectory() @@ -304,7 +308,8 @@ public class AvroIOTest { } @Override - public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Expecting windowed outputs only"); } @@ -316,8 +321,7 @@ public class AvroIOTest { } } - @Rule - public TestPipeline windowedAvroWritePipeline = TestPipeline.create(); + @Rule public TestPipeline windowedAvroWritePipeline = TestPipeline.create(); @Test @Category({ValidatesRunner.class, UsesTestStream.class}) @@ -328,27 +332,31 @@ public class AvroIOTest { Instant base = new Instant(0); ArrayList<GenericClass> allElements = new ArrayList<>(); ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>(); - ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList( - base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)), - base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30))); + ArrayList<Instant> firstWindowTimestamps = + Lists.newArrayList( + base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)), + base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30))); Random random = new Random(); for (int i = 0; i < 100; ++i) { GenericClass item = new GenericClass(i, String.valueOf(i)); allElements.add(item); - firstWindowElements.add(TimestampedValue.of(item, - firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size())))); + firstWindowElements.add( + TimestampedValue.of( + item, firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size())))); } ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>(); - ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList( - base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)), - base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90))); + ArrayList<Instant> secondWindowTimestamps = + Lists.newArrayList( + base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)), + base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90))); for (int i = 100; i < 200; ++i) { GenericClass item = new GenericClass(i, String.valueOf(i)); allElements.add(new GenericClass(i, String.valueOf(i))); - secondWindowElements.add(TimestampedValue.of(item, - secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size())))); + secondWindowElements.add( + TimestampedValue.of( + item, secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size())))); } TimestampedValue<GenericClass>[] firstWindowArray = @@ -356,14 +364,17 @@ public class AvroIOTest { TimestampedValue<GenericClass>[] secondWindowArray = secondWindowElements.toArray(new TimestampedValue[100]); - TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class)) - .advanceWatermarkTo(new Instant(0)) - .addElements(firstWindowArray[0], - Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length)) - .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1))) - .addElements(secondWindowArray[0], - Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)) - .advanceWatermarkToInfinity(); + TestStream<GenericClass> values = + TestStream.create(AvroCoder.of(GenericClass.class)) + .advanceWatermarkTo(new Instant(0)) + .addElements( + firstWindowArray[0], + Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length)) + .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1))) + .addElements( + secondWindowArray[0], + Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)) + .advanceWatermarkToInfinity(); FilenamePolicy policy = new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename)); @@ -384,11 +395,17 @@ public class AvroIOTest { for (int shard = 0; shard < 2; shard++) { for (int window = 0; window < 2; window++) { Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window)); - IntervalWindow intervalWindow = new IntervalWindow( - windowStart, Duration.standardMinutes(1)); + IntervalWindow intervalWindow = + new IntervalWindow(windowStart, Duration.standardMinutes(1)); expectedFiles.add( - new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard - + "-of-1" + "-pane-0-final")); + new File( + baseFilename + + "-" + + intervalWindow.toString() + + "-" + + shard + + "-of-1" + + "-pane-0-final")); } } @@ -396,9 +413,10 @@ public class AvroIOTest { for (File outputFile : expectedFiles) { assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists()); try (DataFileReader<GenericClass> reader = - new DataFileReader<>(outputFile, - new ReflectDatumReader<GenericClass>( - ReflectData.get().getSchema(GenericClass.class)))) { + new DataFileReader<>( + outputFile, + new ReflectDatumReader<GenericClass>( + ReflectData.get().getSchema(GenericClass.class)))) { Iterators.addAll(actualElements, reader); } outputFile.delete(); @@ -408,25 +426,22 @@ public class AvroIOTest { @Test public void testWriteWithDefaultCodec() throws Exception { - AvroIO.Write<String> write = AvroIO.write(String.class) - .to("/tmp/foo/baz"); + AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz"); assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString()); } @Test public void testWriteWithCustomCodec() throws Exception { - AvroIO.Write<String> write = AvroIO.write(String.class) - .to("/tmp/foo/baz") - .withCodec(CodecFactory.snappyCodec()); + AvroIO.Write<String> write = + AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec()); assertEquals(SNAPPY_CODEC, write.getCodec().toString()); } @Test @SuppressWarnings("unchecked") public void testWriteWithSerDeCustomDeflateCodec() throws Exception { - AvroIO.Write<String> write = AvroIO.write(String.class) - .to("/tmp/foo/baz") - .withCodec(CodecFactory.deflateCodec(9)); + AvroIO.Write<String> write = + AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9)); assertEquals( CodecFactory.deflateCodec(9).toString(), @@ -436,9 +451,8 @@ public class AvroIOTest { @Test @SuppressWarnings("unchecked") public void testWriteWithSerDeCustomXZCodec() throws Exception { - AvroIO.Write<String> write = AvroIO.write(String.class) - .to("/tmp/foo/baz") - .withCodec(CodecFactory.xzCodec(9)); + AvroIO.Write<String> write = + AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9)); assertEquals( CodecFactory.xzCodec(9).toString(), @@ -449,28 +463,32 @@ public class AvroIOTest { @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testMetadata() throws Exception { - List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), - new GenericClass(5, "bar")); + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); p.apply(Create.of(values)) - .apply(AvroIO.write(GenericClass.class) - .to(outputFile.getAbsolutePath()) - .withoutSharding() - .withMetadata(ImmutableMap.<String, Object>of( - "stringKey", "stringValue", - "longKey", 100L, - "bytesKey", "bytesValue".getBytes()))); + .apply( + AvroIO.write(GenericClass.class) + .to(outputFile.getAbsolutePath()) + .withoutSharding() + .withMetadata( + ImmutableMap.<String, Object>of( + "stringKey", + "stringValue", + "longKey", + 100L, + "bytesKey", + "bytesValue".getBytes()))); p.run(); - DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), - new GenericDatumReader()); + DataFileStream dataFileStream = + new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); assertEquals("stringValue", dataFileStream.getMetaString("stringKey")); assertEquals(100L, dataFileStream.getMetaLong("longKey")); assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey")); } - @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. private void runTestWrite(String[] expectedElements, int numShards) throws IOException { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); @@ -488,8 +506,8 @@ public class AvroIOTest { p.run(); String shardNameTemplate = - firstNonNull(write.getShardTemplate(), - DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); + firstNonNull( + write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate); } @@ -517,8 +535,8 @@ public class AvroIOTest { for (File outputFile : expectedFiles) { assertTrue("Expected output file " + outputFile.getName(), outputFile.exists()); try (DataFileReader<String> reader = - new DataFileReader<>(outputFile, - new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) { + new DataFileReader<>( + outputFile, new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) { Iterators.addAll(actualElements, reader); } } @@ -560,18 +578,21 @@ public class AvroIOTest { AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*"); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat("AvroIO.Read should include the file pattern in its primitive transform", - displayData, hasItem(hasDisplayItem("filePattern"))); + assertThat( + "AvroIO.Read should include the file pattern in its primitive transform", + displayData, + hasItem(hasDisplayItem("filePattern"))); } @Test public void testWriteDisplayData() { - AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class) - .to("/foo") - .withShardNameTemplate("-SS-of-NN-") - .withSuffix("bar") - .withNumShards(100) - .withCodec(CodecFactory.snappyCodec()); + AvroIO.Write<GenericClass> write = + AvroIO.write(GenericClass.class) + .to("/foo") + .withShardNameTemplate("-SS-of-NN-") + .withSuffix("bar") + .withNumShards(100) + .withCodec(CodecFactory.snappyCodec()); DisplayData displayData = DisplayData.from(write); http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 755bb59..b756778 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -48,7 +48,6 @@ import java.util.zip.GZIPInputStream; import org.apache.beam.sdk.io.FileBasedSink.CompressionType; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; import org.apache.beam.sdk.io.FileBasedSink.Writer; @@ -62,9 +61,7 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link FileBasedSink}. - */ +/** Tests for {@link FileBasedSink}. */ @RunWith(JUnit4.class) public class FileBasedSinkTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -87,14 +84,14 @@ public class FileBasedSinkTest { } /** - * Writer opens the correct file, writes the header, footer, and elements in the correct - * order, and returns the correct filename. + * Writer opens the correct file, writes the header, footer, and elements in the correct order, + * and returns the correct filename. */ @Test public void testWriter() throws Exception { String testUid = "testId"; - ResourceId expectedTempFile = getBaseTempDirectory() - .resolve(testUid, StandardResolveOptions.RESOLVE_FILE); + ResourceId expectedTempFile = + getBaseTempDirectory().resolve(testUid, StandardResolveOptions.RESOLVE_FILE); List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird"); List<String> expected = new ArrayList<>(); expected.add(SimpleSink.SimpleWriter.HEADER); @@ -114,9 +111,7 @@ public class FileBasedSinkTest { assertFileContains(expected, expectedTempFile); } - /** - * Assert that a file contains the lines provided, in the same order as expected. - */ + /** Assert that a file contains the lines provided, in the same order as expected. */ private void assertFileContains(List<String> expected, ResourceId file) throws Exception { try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) { List<String> actual = new ArrayList<>(); @@ -140,9 +135,7 @@ public class FileBasedSinkTest { } } - /** - * Removes temporary files when temporary and output directories differ. - */ + /** Removes temporary files when temporary and output directories differ. */ @Test public void testRemoveWithTempFilename() throws Exception { testRemoveTemporaryFiles(3, getBaseTempDirectory()); @@ -218,7 +211,7 @@ public class FileBasedSinkTest { .getSink() .getDynamicDestinations() .getFilenamePolicy(null) - .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED); + .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED); assertTrue(new File(outputFilename.toString()).exists()); assertFalse(temporaryFiles.get(i).exists()); } @@ -232,8 +225,7 @@ public class FileBasedSinkTest { * Create n temporary and output files and verify that removeTemporaryFiles only removes temporary * files. */ - private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) - throws Exception { + private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception { String prefix = "file"; SimpleSink<Void> sink = SimpleSink.makeSimpleSink( @@ -245,8 +237,7 @@ public class FileBasedSinkTest { List<File> temporaryFiles = new ArrayList<>(); List<File> outputFiles = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - ResourceId tempResource = - WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i); + ResourceId tempResource = WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i); File tmpFile = new File(tempResource.toString()); tmpFile.getParentFile().mkdirs(); assertTrue("not able to create new temp file", tmpFile.createNewFile()); @@ -264,12 +255,9 @@ public class FileBasedSinkTest { for (int i = 0; i < numFiles; i++) { File temporaryFile = temporaryFiles.get(i); assertThat( - String.format("temp file %s exists", temporaryFile), - temporaryFile.exists(), is(false)); + String.format("temp file %s exists", temporaryFile), temporaryFile.exists(), is(false)); File outputFile = outputFiles.get(i); - assertThat( - String.format("output file %s exists", outputFile), - outputFile.exists(), is(true)); + assertThat(String.format("output file %s exists", outputFile), outputFile.exists(), is(true)); } } @@ -279,8 +267,8 @@ public class FileBasedSinkTest { SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3"); List<String> inputContents = Arrays.asList("1", "2", "3"); - List<String> expectedOutputFilenames = Arrays.asList( - "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test"); + List<String> expectedOutputFilenames = + Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test"); Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>(); List<ResourceId> expectedOutputPaths = new ArrayList<>(); @@ -301,8 +289,7 @@ public class FileBasedSinkTest { .getSink() .getDynamicDestinations() .getFilenamePolicy(null) - .unwindowedFilename( - new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED)); + .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED)); } // Copy input files to output files. @@ -319,16 +306,12 @@ public class FileBasedSinkTest { ResourceId outputDirectory, FilenamePolicy policy, int numFiles) { List<ResourceId> filenames = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - filenames.add( - policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED)); + filenames.add(policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED)); } return filenames; } - /** - * Output filenames are generated correctly when an extension is supplied. - */ - + /** Output filenames are generated correctly when an extension is supplied. */ @Test public void testGenerateOutputFilenames() { List<ResourceId> expected; @@ -340,17 +323,17 @@ public class FileBasedSinkTest { root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED); FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); - expected = Arrays.asList( - root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE), - root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE), - root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE) - ); + expected = + Arrays.asList( + root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)); actual = generateDestinationFilenames(root, policy, 3); assertEquals(expected, actual); - expected = Collections.singletonList( - root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE) - ); + expected = + Collections.singletonList( + root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)); actual = generateDestinationFilenames(root, policy, 1); assertEquals(expected, actual); @@ -396,17 +379,17 @@ public class FileBasedSinkTest { root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED); FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); - expected = Arrays.asList( - root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE), - root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE), - root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE) - ); + expected = + Arrays.asList( + root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE), + root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)); actual = generateDestinationFilenames(root, policy, 3); assertEquals(expected, actual); - expected = Collections.singletonList( - root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE) - ); + expected = + Collections.singletonList( + root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)); actual = generateDestinationFilenames(root, policy, 1); assertEquals(expected, actual); @@ -479,9 +462,8 @@ public class FileBasedSinkTest { } } - private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory, - String... values) - throws IOException { + private File writeValuesWithWritableByteChannelFactory( + final WritableByteChannelFactory factory, String... values) throws IOException { final File file = tmpFolder.newFile("test.gz"); final WritableByteChannel channel = factory.create(Channels.newChannel(new FileOutputStream(file))); http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 55f2a87..1ca7169 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -70,8 +70,10 @@ import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -89,17 +91,12 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for the WriteFiles PTransform. - */ +/** Tests for the WriteFiles PTransform. */ @RunWith(JUnit4.class) public class WriteFilesTest { - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule - public final TestPipeline p = TestPipeline.create(); - @Rule - public ExpectedException thrown = ExpectedException.none(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public final TestPipeline p = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); @SuppressWarnings("unchecked") // covariant cast private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP = @@ -114,12 +111,12 @@ public class WriteFilesTest { private static final PTransform<PCollection<String>, PCollectionView<Integer>> SHARDING_TRANSFORM = - new PTransform<PCollection<String>, PCollectionView<Integer>>() { - @Override - public PCollectionView<Integer> expand(PCollection<String> input) { - return null; - } - }; + new PTransform<PCollection<String>, PCollectionView<Integer>>() { + @Override + public PCollectionView<Integer> expand(PCollection<String> input) { + return null; + } + }; private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> { private final Window<T> window; @@ -161,18 +158,20 @@ public class WriteFilesTest { } private String getBaseOutputFilename() { - return getBaseOutputDirectory() - .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString(); + return getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE).toString(); } - /** - * Test a WriteFiles transform with a PCollection of elements. - */ + /** Test a WriteFiles transform with a PCollection of elements. */ @Test @Category(NeedsRunner.class) public void testWrite() throws IOException { - List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", - "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + List<String> inputs = + Arrays.asList( + "Critical canary", + "Apprehensive eagle", + "Intimidating pigeon", + "Pedantic gull", + "Frisky finch"); runWrite( inputs, IDENTITY_MAP, @@ -180,9 +179,7 @@ public class WriteFilesTest { WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } - /** - * Test that WriteFiles with an empty input still produces one shard. - */ + /** Test that WriteFiles with an empty input still produces one shard. */ @Test @Category(NeedsRunner.class) public void testEmptyWrite() throws IOException { @@ -191,8 +188,7 @@ public class WriteFilesTest { IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); - checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), - Optional.of(1)); + checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1)); } /** @@ -212,7 +208,6 @@ public class WriteFilesTest { private ResourceId getBaseOutputDirectory() { return LocalResources.fromFile(tmpFolder.getRoot(), true) .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY); - } private SimpleSink<Void> makeSimpleSink() { @@ -267,9 +262,7 @@ public class WriteFilesTest { .withNumShards(20)); } - /** - * Test a WriteFiles transform with an empty PCollection. - */ + /** Test a WriteFiles transform with an empty PCollection. */ @Test @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws IOException { @@ -281,14 +274,17 @@ public class WriteFilesTest { WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } - /** - * Test a WriteFiles with a windowed PCollection. - */ + /** Test a WriteFiles with a windowed PCollection. */ @Test @Category(NeedsRunner.class) public void testWriteWindowed() throws IOException { - List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", - "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + List<String> inputs = + Arrays.asList( + "Critical canary", + "Apprehensive eagle", + "Intimidating pigeon", + "Pedantic gull", + "Frisky finch"); runWrite( inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), @@ -296,14 +292,17 @@ public class WriteFilesTest { WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())); } - /** - * Test a WriteFiles with sessions. - */ + /** Test a WriteFiles with sessions. */ @Test @Category(NeedsRunner.class) public void testWriteWithSessions() throws IOException { - List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", - "Intimidating pigeon", "Pedantic gull", "Frisky finch"); + List<String> inputs = + Arrays.asList( + "Critical canary", + "Apprehensive eagle", + "Intimidating pigeon", + "Pedantic gull", + "Frisky finch"); runWrite( inputs, @@ -589,19 +588,24 @@ public class WriteFilesTest { public String filenamePrefixForWindow(IntervalWindow window) { String prefix = baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); - return String.format("%s%s-%s", - prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); + return String.format( + "%s%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); } @Override - public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) { - IntervalWindow window = (IntervalWindow) context.getWindow(); + public ResourceId windowedFilename( + int shardNumber, + int numShards, + BoundedWindow window, + PaneInfo paneInfo, + OutputFileHints outputFileHints) { + IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s%s", - filenamePrefixForWindow(window), - context.getShardNumber(), - context.getNumShards(), + filenamePrefixForWindow(intervalWindow), + shardNumber, + numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix); return baseFilename @@ -610,17 +614,14 @@ public class WriteFilesTest { } @Override - public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + int shardNumber, int numShards, OutputFileHints outputFileHints) { String prefix = baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), ""); String filename = String.format( "%s-%s-of-%s%s%s", - prefix, - context.getShardNumber(), - context.getNumShards(), - outputFileHints.getSuggestedFilenameSuffix(), - suffix); + prefix, shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix); return baseFilename .getCurrentDirectory() .resolve(filename, StandardResolveOptions.RESOLVE_FILE); @@ -656,12 +657,14 @@ public class WriteFilesTest { Optional<Integer> numShards = (write.getNumShards() != null) - ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent(); + ? Optional.of(write.getNumShards().get()) + : Optional.<Integer>absent(); checkFileContents(baseName, inputs, numShards); } - static void checkFileContents(String baseName, List<String> inputs, - Optional<Integer> numExpectedShards) throws IOException { + static void checkFileContents( + String baseName, List<String> inputs, Optional<Integer> numExpectedShards) + throws IOException { List<File> outputFiles = Lists.newArrayList(); final String pattern = baseName + "*"; List<Metadata> metadata = @@ -690,12 +693,11 @@ public class WriteFilesTest { assertThat(actual, containsInAnyOrder(inputs.toArray())); } - /** - * Options for test, exposed for PipelineOptionsFactory. - */ + /** Options for test, exposed for PipelineOptionsFactory. */ public interface WriteOptions extends TestPipelineOptions { @Description("Test flag and value") String getTestFlag(); + void setTestFlag(String value); }