This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit b2d0671185fa1bd7f100853c7921e555c84578e7 Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Wed Nov 15 18:25:14 2017 -0800 non-null window/pane in FileResult --- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 10 ++++-- .../java/org/apache/beam/sdk/io/WriteFiles.java | 38 ++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 2108253..c8bdbfc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -655,6 +655,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> checkArgument( result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result); ResourceId finalFilename = result.getDestinationFile( + windowedWrites, getSink().getDynamicDestinations(), effectiveNumShards, getSink().getWritableByteChannelFactory()); @@ -984,7 +985,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> public static final class FileResult<DestinationT> { private final ResourceId tempFilename; private final int shard; - private final @Nullable BoundedWindow window; + private final BoundedWindow window; private final PaneInfo paneInfo; private final DestinationT destination; @@ -992,9 +993,11 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> public FileResult( ResourceId tempFilename, int shard, - @Nullable BoundedWindow window, + BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { + checkArgument(window != null); + checkArgument(paneInfo != null); this.tempFilename = tempFilename; this.shard = shard; this.window = window; @@ -1029,13 +1032,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> @Experimental(Kind.FILESYSTEM) public ResourceId getDestinationFile( + boolean windowedWrites, DynamicDestinations<?, DestinationT, ?> dynamicDestinations, int numShards, OutputFileHints outputFileHints) { checkArgument(getShard() != UNKNOWN_SHARDNUM); checkArgument(numShards > 0); FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination); - if (getWindow() != null) { + if (windowedWrites) { return policy.windowedFilename( getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints); } else { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 35b28a1..19457e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; @@ -387,7 +388,6 @@ public class WriteFiles<UserT, DestinationT, OutputT> private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> { private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag; private final Coder<DestinationT> destinationCoder; - private final boolean windowedWrites; // Initialized in startBundle() private @Nullable Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers; @@ -395,10 +395,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> private int spilledShardNum = UNKNOWN_SHARDNUM; WriteBundles( - boolean windowedWrites, TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag, Coder<DestinationT> destinationCoder) { - this.windowedWrites = windowedWrites; this.unwrittenRecordsTag = unwrittenRecordsTag; this.destinationCoder = destinationCoder; } @@ -466,13 +464,11 @@ public class WriteFiles<UserT, DestinationT, OutputT> throw e; } BoundedWindow window = key.window; - FileResult<DestinationT> res = - windowedWrites - ? new FileResult<>( - writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination) - : new FileResult<>( - writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination); - c.output(res, window.maxTimestamp(), window); + c.output( + new FileResult<>( + writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination), + window.maxTimestamp(), + window); } } @@ -535,14 +531,9 @@ public class WriteFiles<UserT, DestinationT, OutputT> shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING ? c.element().getKey().getShardNumber() : UNKNOWN_SHARDNUM; - if (windowedWrites) { - c.output( - new FileResult<>( - writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey())); - } else { - c.output( - new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey())); - } + c.output( + new FileResult<>( + writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey())); } } @@ -706,7 +697,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> PCollectionTuple writeTuple = input.apply( writeName, - ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder)) + ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder)) .withSideInputs(sideInputs) .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); PCollection<FileResult<DestinationT>> writtenBundleFiles = @@ -1011,14 +1002,19 @@ public class WriteFiles<UserT, DestinationT, OutputT> writer.open(uuid, destination); writer.close(); completeResults.add( - new FileResult<>(writer.getOutputFile(), shard, null, null, destination)); + new FileResult<>( + writer.getOutputFile(), + shard, + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + destination)); } LOG.debug("Done creating extra shards for {}.", destination); } return writeOperation.buildOutputFilenames( destination, - null, + GlobalWindow.INSTANCE, (fixedNumShards == null) ? null : completeResults.size(), completeResults); } -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.