Repository: beam Updated Branches: refs/heads/master 10e47646d -> 698b89e2b
Add spilling code to WriteFiles. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69b01a61 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69b01a61 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69b01a61 Branch: refs/heads/master Commit: 69b01a6118702277348d2f625af669225c9ed99e Parents: 10e4764 Author: Reuven Lax <re...@google.com> Authored: Sat May 13 12:53:08 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Jun 20 14:28:17 2017 -0700 ---------------------------------------------------------------------- runners/direct-java/pom.xml | 3 +- .../beam/runners/direct/DirectRunner.java | 28 ++-- .../java/org/apache/beam/sdk/io/WriteFiles.java | 133 +++++++++++++++---- .../beam/sdk/testing/TestPipelineOptions.java | 10 ++ .../java/org/apache/beam/sdk/io/SimpleSink.java | 4 + .../org/apache/beam/sdk/io/WriteFilesTest.java | 89 ++++++++++--- 6 files changed, 209 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index bec2113..6346575 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -155,7 +155,8 @@ <systemPropertyVariables> <beamTestPipelineOptions> [ - "--runner=DirectRunner" + "--runner=DirectRunner", + "--unitTest" ] </beamTestPipelineOptions> </systemPropertyVariables> http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 136ccf3..a16e24d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; @@ -221,15 +222,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @SuppressWarnings("rawtypes") @VisibleForTesting List<PTransformOverride> defaultTransformOverrides() { - return ImmutableList.<PTransformOverride>builder() - .add( - PTransformOverride.of( - PTransformMatchers.writeWithRunnerDeterminedSharding(), - new WriteWithShardingFactory())) /* Uses a view internally. */ - .add( - PTransformOverride.of( - PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN), - new ViewOverrideFactory())) /* Uses pardos and GBKs */ + TestPipelineOptions testOptions = options.as(TestPipelineOptions.class); + ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder(); + if (!testOptions.isUnitTest()) { + builder.add( + PTransformOverride.of( + PTransformMatchers.writeWithRunnerDeterminedSharding(), + new WriteWithShardingFactory())); /* Uses a view internally. */ + } + builder = builder.add( + PTransformOverride.of( + PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN), + new ViewOverrideFactory())) /* Uses pardos and GBKs */ .add( PTransformOverride.of( PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN), @@ -254,9 +258,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */ .add( PTransformOverride.of( - PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN), - new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */ - .build(); + PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN), + new DirectGroupByKeyOverrideFactory())); /* returns two chained primitives. */ + return builder.build(); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 2fd10ac..a220eab 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -56,8 +58,12 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +92,18 @@ import org.slf4j.LoggerFactory; public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); + // The maximum number of file writers to keep open in a single bundle at a time, since file + // writers default to 64mb buffers. This comes into play when writing per-window files. + // The first 20 files from a single WriteFiles transform will write files inline in the + // transform. Anything beyond that might be shuffled. + // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on + // their own policy. + private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20; + + // When we spill records, shard the output keys to prevent hotspots. + // We could consider making this a parameter. + private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; + static final int UNKNOWN_SHARDNUM = -1; private FileBasedSink<T> sink; private WriteOperation<T> writeOperation; @@ -98,6 +116,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { @Nullable private final ValueProvider<Integer> numShardsProvider; private final boolean windowedWrites; + private int maxNumWritersPerBundle; /** * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting @@ -105,18 +124,21 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { */ public static <T> WriteFiles<T> to(FileBasedSink<T> sink) { checkNotNull(sink, "sink"); - return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false); + return new WriteFiles<>(sink, null /* runner-determined sharding */, null, + false, DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE); } private WriteFiles( FileBasedSink<T> sink, @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, @Nullable ValueProvider<Integer> numShardsProvider, - boolean windowedWrites) { + boolean windowedWrites, + int maxNumWritersPerBundle) { this.sink = sink; this.computeNumShards = computeNumShards; this.numShardsProvider = numShardsProvider; this.windowedWrites = windowedWrites; + this.maxNumWritersPerBundle = maxNumWritersPerBundle; } @Override @@ -213,7 +235,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * more information. */ public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) { - return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites); + return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites, + maxNumWritersPerBundle); + } + + /** + * Set the maximum number of writers created in a bundle before spilling to shuffle. + */ + public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle) { + return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites, + maxNumWritersPerBundle); } /** @@ -226,7 +257,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { checkNotNull( sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); - return new WriteFiles<>(sink, sharding, null, windowedWrites); + return new WriteFiles<>(sink, sharding, null, windowedWrites, maxNumWritersPerBundle); } /** @@ -234,7 +265,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * runner-determined sharding. */ public WriteFiles<T> withRunnerDeterminedSharding() { - return new WriteFiles<>(sink, null, null, windowedWrites); + return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle); } /** @@ -252,7 +283,8 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * positive value. */ public WriteFiles<T> withWindowedWrites() { - return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true); + return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true, + maxNumWritersPerBundle); } /** @@ -260,7 +292,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled. */ private class WriteWindowedBundles extends DoFn<T, FileResult> { + private final TupleTag<KV<Integer, T>> unwrittedRecordsTag; private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters; + int spilledShardNum = UNKNOWN_SHARDNUM; + + WriteWindowedBundles(TupleTag<KV<Integer, T>> unwrittedRecordsTag) { + this.unwrittedRecordsTag = unwrittedRecordsTag; + } @StartBundle public void startBundle(StartBundleContext c) { @@ -277,19 +315,28 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo); writer = windowedWriters.get(key); if (writer == null) { - String uuid = UUID.randomUUID().toString(); - LOG.info( - "Opening writer {} for write operation {}, window {} pane {}", - uuid, - writeOperation, - window, - paneInfo); - writer = writeOperation.createWriter(); - writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM); - windowedWriters.put(key, writer); - LOG.debug("Done opening writer"); + if (windowedWriters.size() <= maxNumWritersPerBundle) { + String uuid = UUID.randomUUID().toString(); + LOG.info( + "Opening writer {} for write operation {}, window {} pane {}", + uuid, + writeOperation, + window, + paneInfo); + writer = writeOperation.createWriter(); + writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM); + windowedWriters.put(key, writer); + LOG.debug("Done opening writer"); + } else { + if (spilledShardNum == UNKNOWN_SHARDNUM) { + spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); + } else { + spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR; + } + c.output(unwrittedRecordsTag, KV.of(spilledShardNum, c.element())); + return; + } } - writeOrClose(writer, c.element()); } @@ -352,11 +399,17 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { } } + enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }; + /** * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements * for each shard have been collected into a single iterable. */ private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> { + ShardAssignment shardNumberAssignment; + WriteShardedBundles(ShardAssignment shardNumberAssignment) { + this.shardNumberAssignment = shardNumberAssignment; + } @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { // In a sharded write, single input element represents one shard. We can open and close @@ -364,7 +417,9 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { LOG.info("Opening writer for write operation {}", writeOperation); Writer<T> writer = writeOperation.createWriter(); if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey()); + int shardNumber = shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING + ? c.element().getKey() : UNKNOWN_SHARDNUM; + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), shardNumber); } else { writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM); } @@ -493,14 +548,35 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { // initial ParDo. PCollection<FileResult> results; final PCollectionView<Integer> numShardsView; + @SuppressWarnings("unchecked") Coder<BoundedWindow> shardedWindowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); if (computeNumShards == null && numShardsProvider == null) { numShardsView = null; - results = - input.apply( - "WriteBundles", - ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles())); + if (windowedWrites) { + TupleTag<FileResult> writtenRecordsTag = new TupleTag<>("writtenRecordsTag"); + TupleTag<KV<Integer, T>> unwrittedRecordsTag = new TupleTag<>("unwrittenRecordsTag"); + PCollectionTuple writeTuple = input.apply("WriteWindowedBundles", ParDo.of( + new WriteWindowedBundles(unwrittedRecordsTag)) + .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); + PCollection<FileResult> writtenBundleFiles = writeTuple.get(writtenRecordsTag) + .setCoder(FileResultCoder.of(shardedWindowCoder)); + // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in + // finalize to stay consistent with what WriteWindowedBundles does. + PCollection<FileResult> writtenGroupedFiles = + writeTuple + .get(unwrittedRecordsTag) + .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder())) + .apply("GroupUnwritten", GroupByKey.<Integer, T>create()) + .apply("WriteUnwritten", ParDo.of( + new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))) + .setCoder(FileResultCoder.of(shardedWindowCoder)); + results = PCollectionList.of(writtenBundleFiles).and(writtenGroupedFiles) + .apply(Flatten.<FileResult>pCollections()); + } else { + results = + input.apply("WriteUnwindowedBundles", ParDo.of(new WriteUnwindowedBundles())); + } } else { List<PCollectionView<?>> sideInputs = Lists.newArrayList(); if (computeNumShards != null) { @@ -517,10 +593,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> { (numShardsView != null) ? null : numShardsProvider)) .withSideInputs(sideInputs)) .apply("GroupIntoShards", GroupByKey.<Integer, T>create()); - shardedWindowCoder = - (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder(); - - results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles())); + // Since this path might be used by streaming runners processing triggers, it's important + // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE + // strategy works by sorting all FileResult objects and assigning them numbers, which is not + // guaranteed to work well when processing triggers - if the finalize step retries it might + // see a different Iterable of FileResult objects, and it will assign different shard numbers. + results = sharded.apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))); } results.setCoder(FileResultCoder.of(shardedWindowCoder)); http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index 206bc1f..904f3a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -20,8 +20,10 @@ package org.apache.beam.sdk.testing; import com.fasterxml.jackson.annotation.JsonIgnore; import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -50,6 +52,14 @@ public interface TestPipelineOptions extends PipelineOptions { Long getTestTimeoutSeconds(); void setTestTimeoutSeconds(Long value); + @Default.Boolean(false) + @Internal + @Hidden + @org.apache.beam.sdk.options.Description( + "Indicates whether this is an automatically-run unit test.") + boolean isUnitTest(); + void setUnitTest(boolean unitTest); + /** * Factory for {@link PipelineResult} matchers which always pass. */ http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index c97313d..bdf37f6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -40,6 +40,10 @@ class SimpleSink extends FileBasedSink<String> { writableByteChannelFactory); } + public SimpleSink(ResourceId baseOutputDirectory, FilenamePolicy filenamePolicy) { + super(StaticValueProvider.of(baseOutputDirectory), filenamePolicy); + } + @Override public SimpleWriteOperation createWriteOperation() { return new SimpleWriteOperation(this); http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index a5dacd1..e6a0dcf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.SimpleSink.SimpleWriter; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -62,12 +63,15 @@ import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -160,7 +164,7 @@ public class WriteFilesTest { public void testWrite() throws IOException { List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle", "Intimidating pigeon", "Pedantic gull", "Frisky finch"); - runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename()); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); } /** @@ -169,7 +173,8 @@ public class WriteFilesTest { @Test @Category(NeedsRunner.class) public void testEmptyWrite() throws IOException { - runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename()); + runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink())); checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1)); } @@ -185,7 +190,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - Optional.of(1)); + WriteFiles.to(makeSimpleSink()).withNumShards(1)); } private ResourceId getBaseOutputDirectory() { @@ -194,7 +199,8 @@ public class WriteFilesTest { } private SimpleSink makeSimpleSink() { - return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple"); + FilenamePolicy filenamePolicy = new PerWindowFiles("file", "simple"); + return new SimpleSink(getBaseOutputDirectory(), filenamePolicy); } @Test @@ -235,7 +241,7 @@ public class WriteFilesTest { Arrays.asList("one", "two", "three", "four", "five", "six"), IDENTITY_MAP, getBaseOutputFilename(), - Optional.of(20)); + WriteFiles.to(makeSimpleSink()).withNumShards(20)); } /** @@ -245,7 +251,7 @@ public class WriteFilesTest { @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws IOException { List<String> inputs = new ArrayList<>(); - runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename()); + runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); } /** @@ -258,7 +264,7 @@ public class WriteFilesTest { "Intimidating pigeon", "Pedantic gull", "Frisky finch"); runWrite( inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))), - getBaseOutputFilename()); + getBaseOutputFilename(), WriteFiles.to(makeSimpleSink())); } /** @@ -274,10 +280,23 @@ public class WriteFilesTest { inputs, new WindowAndReshuffle<>( Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))), - getBaseOutputFilename()); + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink())); } @Test + @Category(NeedsRunner.class) + public void testWriteSpilling() throws IOException { + List<String> inputs = Lists.newArrayList(); + for (int i = 0; i < 100; ++i) { + inputs.add("mambo_number_" + i); + } + runWrite( + inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))), + getBaseOutputFilename(), + WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites()); + } + public void testBuildWrite() { SimpleSink sink = makeSimpleSink(); WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3); @@ -365,8 +384,45 @@ public class WriteFilesTest { */ private void runWrite( List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, - String baseName) throws IOException { - runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent()); + String baseName, WriteFiles<String> write) throws IOException { + runShardedWrite(inputs, transform, baseName, write); + } + + private static class PerWindowFiles extends FilenamePolicy { + private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis(); + private final String prefix; + private final String suffix; + + public PerWindowFiles(String prefix, String suffix) { + this.prefix = prefix; + this.suffix = suffix; + } + + public String filenamePrefixForWindow(IntervalWindow window) { + return String.format("%s%s-%s", + prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); + } + + @Override + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext context, String extension) { + IntervalWindow window = (IntervalWindow) context.getWindow(); + String filename = String.format( + "%s-%s-of-%s%s%s", + filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), + extension, suffix); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + } + + @Override + public ResourceId unwindowedFilename( + ResourceId outputDirectory, Context context, String extension) { + String filename = String.format( + "%s%s-of-%s%s%s", + prefix, context.getShardNumber(), context.getNumShards(), + extension, suffix); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + } } /** @@ -379,7 +435,7 @@ public class WriteFilesTest { List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform, String baseName, - Optional<Integer> numConfiguredShards) throws IOException { + WriteFiles<String> write) throws IOException { // Flag to validate that the pipeline options are passed to the Sink WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class); options.setTestFlag("test_value"); @@ -390,18 +446,15 @@ public class WriteFilesTest { for (long i = 0; i < inputs.size(); i++) { timestamps.add(i + 1); } - - SimpleSink sink = makeSimpleSink(); - WriteFiles<String> write = WriteFiles.to(sink); - if (numConfiguredShards.isPresent()) { - write = write.withNumShards(numConfiguredShards.get()); - } p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of())) .apply(transform) .apply(write); p.run(); - checkFileContents(baseName, inputs, numConfiguredShards); + Optional<Integer> numShards = + (write.getNumShards() != null) + ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent(); + checkFileContents(baseName, inputs, numShards); } static void checkFileContents(String baseName, List<String> inputs,