[BEAM-2544] Fix flaky AvroIOTest by eliminating race condition in "write then read" tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/911edbad Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/911edbad Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/911edbad Branch: refs/heads/DSL_SQL Commit: 911edbade388a63626e0ad6f8b7c2ad7a9f9b7c2 Parents: dd9e866 Author: Alex Filatov <alex-fila...@users.noreply.github.com> Authored: Thu Jun 29 23:23:04 2017 +0300 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Jul 18 15:49:44 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 46 +++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/911edbad/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 4a1386c..4380c57 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -90,7 +90,11 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AvroIOTest { - @Rule public TestPipeline p = TestPipeline.create(); + @Rule + public TestPipeline writePipeline = TestPipeline.create(); + + @Rule + public TestPipeline readPipeline = TestPipeline.create(); @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -144,15 +148,15 @@ public class AvroIOTest { ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); - p.apply(Create.of(values)) + writePipeline.apply(Create.of(values)) .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); - p.run(); + writePipeline.run().waitUntilFinish(); PCollection<GenericClass> input = - p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); + readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(values); - p.run(); + readPipeline.run(); } @Test @@ -163,19 +167,19 @@ public class AvroIOTest { ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); - p.apply(Create.of(values)) + writePipeline.apply(Create.of(values)) .apply( AvroIO.write(GenericClass.class) .to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.deflateCodec(9))); - p.run(); + writePipeline.run().waitUntilFinish(); PCollection<GenericClass> input = - p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); + readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(values); - p.run(); + readPipeline.run(); DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); assertEquals("deflate", dataFileStream.getMetaString("avro.codec")); @@ -189,19 +193,19 @@ public class AvroIOTest { ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); - p.apply(Create.of(values)) + writePipeline.apply(Create.of(values)) .apply( AvroIO.write(GenericClass.class) .to(outputFile.getAbsolutePath()) .withoutSharding() .withCodec(CodecFactory.nullCodec())); - p.run(); + writePipeline.run().waitUntilFinish(); PCollection<GenericClass> input = - p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); + readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(values); - p.run(); + readPipeline.run(); DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); assertEquals("null", dataFileStream.getMetaString("avro.codec")); @@ -261,18 +265,18 @@ public class AvroIOTest { ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); - p.apply(Create.of(values)) + writePipeline.apply(Create.of(values)) .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding()); - p.run(); + writePipeline.run().waitUntilFinish(); List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); PCollection<GenericClassV2> input = - p.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())); + readPipeline.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath())); PAssert.that(input).containsInAnyOrder(expected); - p.run(); + readPipeline.run(); } private static class WindowedFilenamePolicy extends FilenamePolicy { @@ -467,7 +471,7 @@ public class AvroIOTest { ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); - p.apply(Create.of(values)) + writePipeline.apply(Create.of(values)) .apply( AvroIO.write(GenericClass.class) .to(outputFile.getAbsolutePath()) @@ -480,7 +484,7 @@ public class AvroIOTest { 100L, "bytesKey", "bytesValue".getBytes()))); - p.run(); + writePipeline.run(); DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader()); @@ -502,8 +506,8 @@ public class AvroIOTest { System.out.println("no sharding"); write = write.withoutSharding(); } - p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write); - p.run(); + writePipeline.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write); + writePipeline.run(); String shardNameTemplate = firstNonNull(