This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 060f05c659920c3a48dbc67c38db770788802d06 Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Mon Nov 27 11:41:25 2017 -0800 Fixes tests --- .../core/construction/WriteFilesTranslation.java | 2 +- .../java/org/apache/beam/sdk/io/FileBasedSink.java | 32 ++++++++--------- .../java/org/apache/beam/sdk/io/WriteFiles.java | 17 +++++++-- .../org/apache/beam/sdk/io/FileBasedSinkTest.java | 42 ++++++++++++---------- 4 files changed, 55 insertions(+), 38 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index a6dd55c..90f6453 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -303,7 +303,7 @@ public class WriteFilesTranslation { public Map<Class<? extends PTransform>, TransformPayloadTranslator> getTransformPayloadTranslators() { return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap( - WriteFiles.class, new WriteFilesTranslator()); + WriteFiles.CONCRETE_CLASS, new WriteFilesTranslator()); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 12c4555..48d7521 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -699,7 +699,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> // if set. Set<Integer> missingShardNums; if (numShards == null) { - missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM); + missingShardNums = + existingResults.isEmpty() + ? ImmutableSet.of(UNKNOWN_SHARDNUM) + : ImmutableSet.<Integer>of(); } else { missingShardNums = Sets.newHashSet(); for (int i = 0; i < numShards; ++i) { @@ -726,8 +729,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> String uuid = UUID.randomUUID().toString(); LOG.info("Opening empty writer {} for destination {}", uuid, dest); Writer<DestinationT, ?> writer = createWriter(); + writer.setDestination(dest); // Currently this code path is only called in the unwindowed case. - writer.open(uuid, dest); + writer.open(uuid); writer.close(); completeResults.add( new FileResult<>( @@ -760,8 +764,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException { int numFiles = resultsToFinalFilenames.size(); LOG.debug("Copying {} files.", numFiles); - List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size()); - List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size()); + List<ResourceId> srcFiles = new ArrayList<>(); + List<ResourceId> dstFiles = new ArrayList<>(); for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { srcFiles.add(entry.getKey().getTempFilename()); dstFiles.add(entry.getValue()); @@ -923,22 +927,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> protected void finishWrite() throws Exception {} /** - * Performs bundle initialization. For example, creates a temporary file for writing or - * initializes any state that will be used across calls to {@link Writer#write}. + * Opens a uniquely named temporary file and initializes the writer using {@link #prepareWrite}. * * <p>The unique id that is given to open should be used to ensure that the writer's output does * not interfere with the output of other Writers, as a bundle may be executed many times for * fault tolerance. - * - * <p>The window and paneInfo arguments are populated when windowed writes are requested. shard - * id populated for the case of static sharding. In cases where the runner is dynamically - * picking sharding, shard might be set to -1. */ - public final void open( - String uId, DestinationT destination) - throws Exception { + public final void open(String uId) throws Exception { this.id = uId; - this.destination = destination; ResourceId tempDirectory = getWriteOperation().tempDirectory.get(); outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE); verifyNotNull( @@ -1040,6 +1036,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> return writeOperation; } + void setDestination(DestinationT destination) { + this.destination = destination; + } + /** Return the user destination object for this writer. */ public DestinationT getDestination() { return destination; @@ -1064,8 +1064,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { - checkArgument(window != null); - checkArgument(paneInfo != null); + checkArgument(window != null, "window can not be null"); + checkArgument(paneInfo != null, "paneInfo can not be null"); this.tempFilename = tempFilename; this.shard = shard; this.window = window; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 12f5cce..54f055d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; @@ -114,6 +115,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> { private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); + /** For internal use by runners. */ + @Internal + public static final Class<? extends WriteFiles> CONCRETE_CLASS = AutoValue_WriteFiles.class; + // The maximum number of file writers to keep open in a single bundle at a time, since file // writers default to 64mb buffers. This comes into play when writing per-window files. // The first 20 files from a single WriteFiles transform will write files inline in the @@ -497,7 +502,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> paneInfo, destination); writer = writeOperation.createWriter(); - writer.open(uuid, destination); + writer.setDestination(destination); + writer.open(uuid); writers.put(key, writer); LOG.debug("Done opening writer"); } else { @@ -623,7 +629,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> "ApplyShardingKey", ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder)) .withSideInputs( - numShardsView == null + (numShardsView == null) ? ImmutableList.<PCollectionView<Integer>>of() : ImmutableList.of(numShardsView))) .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) @@ -706,7 +712,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> c.pane(), destination); writer = writeOperation.createWriter(); - writer.open(uuid, destination); + writer.setDestination(destination); + writer.open(uuid); writers.put(destination, writer); } writeOrClose(writer, getDynamicDestinations().formatRecord(input)); @@ -724,6 +731,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> throw e; } int shard = c.element().getKey().getShardNumber(); + checkArgument( + shard != UNKNOWN_SHARDNUM, + "Shard should have been set, but is unset for element %s", + c.element()); c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 561d036..0c9bdc1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -52,6 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; @@ -100,7 +102,7 @@ public class FileBasedSinkTest { SimpleSink.SimpleWriter<Void> writer = buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); - writer.open(testUid, null); + writer.open(testUid); for (String value : values) { writer.write(value); } @@ -196,14 +198,14 @@ public class FileBasedSinkTest { new FileResult<Void>( LocalResources.fromFile(temporaryFiles.get(i), false), UNKNOWN_SHARDNUM, - null, - null, + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, null)); } // TODO: test with null first argument? List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames = - writeOp.finalizeDestination(null, null, null, fileResults); + writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, null, fileResults); writeOp.moveToOutputFiles(resultsToFinalFilenames); for (int i = 0; i < numFiles; i++) { @@ -213,7 +215,7 @@ public class FileBasedSinkTest { .getDynamicDestinations() .getFilenamePolicy(null) .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED); - assertTrue(new File(outputFilename.toString()).exists()); + assertTrue(outputFilename.toString(), new File(outputFilename.toString()).exists()); assertFalse(temporaryFiles.get(i).exists()); } @@ -292,7 +294,11 @@ public class FileBasedSinkTest { resultsToFinalFilenames.add( KV.of( new FileResult<Void>( - LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null), + LocalResources.fromFile(inputTmpFile, false), + UNKNOWN_SHARDNUM, + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + null), finalFilename)); } @@ -354,22 +360,22 @@ public class FileBasedSinkTest { SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED); SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink); - ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE); - ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE); - ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE); - // More than one shard does. try { - List<FileResult<Void>> results = - Lists.newArrayList( - new FileResult<Void>(temp1, 1 /* shard */, null, null, null), - new FileResult<Void>(temp2, 1 /* shard */, null, null, null), - new FileResult<Void>(temp3, 1 /* shard */, null, null, null)); - writeOp.finalizeDestination(null, null, 5 /* numShards */, results); + List<FileResult<Void>> results = Lists.newArrayList(); + for (int i = 0; i < 3; ++i) { + results.add(new FileResult<Void>( + root.resolve("temp" + i, StandardResolveOptions.RESOLVE_FILE), + 1 /* shard - should be different, but is the same */, + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + null)); + } + writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, 5 /* numShards */, results); fail("Should have failed."); } catch (IllegalArgumentException exn) { assertThat(exn.getMessage(), containsString("generated the same name")); + assertThat(exn.getMessage(), containsString("temp0")); assertThat(exn.getMessage(), containsString("temp1")); - assertThat(exn.getMessage(), containsString("temp2")); } } @@ -505,7 +511,7 @@ public class FileBasedSinkTest { expected.add("footer"); expected.add("footer"); - writer.open(testUid, null); + writer.open(testUid); writer.write("a"); writer.write("b"); writer.close(); -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.