Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as 
a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6dea0992
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6dea0992
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6dea0992

Branch: refs/heads/python-sdk
Commit: 6dea0992d9976b39232cf846906831feaa25ec43
Parents: 63331aa
Author: Stas Levin <stasle...@gmail.com>
Authored: Tue Dec 20 13:26:07 2016 +0200
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Dec 20 09:55:45 2016 -0800

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 25 +++++++++++++-------
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 13 ++++++----
 2 files changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6dea0992/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index dc566d2..4ddfdea 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -643,6 +643,7 @@ public class BigQueryIOTest implements Serializable {
     }
   }
 
+  @Rule public final transient TestPipeline p = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   @Rule public transient ExpectedLogs logged = 
ExpectedLogs.none(BigQueryIO.class);
   @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
@@ -1370,7 +1371,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testBuildWriteWithoutTable() {
-    Pipeline p = TestPipeline.create();
+
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("must set the table reference");
     p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
@@ -1591,9 +1592,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsCreateNoSchema() {
+    p.enableAbandonedNodeEnforcement(false);
+
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("no schema was provided");
-    TestPipeline.create()
+    p
         .apply(Create.<TableRow>of())
         .apply(BigQueryIO.Write
             .to("dataset.table")
@@ -1602,9 +1605,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsTableAndTableSpec() {
+    p.enableAbandonedNodeEnforcement(false);
+
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Cannot set both a table reference and a table 
function");
-    TestPipeline.create()
+    p
         .apply(Create.<TableRow>of())
         .apply(BigQueryIO.Write
             .to("dataset.table")
@@ -1618,9 +1623,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsNoTableAndNoTableSpec() {
+    p.enableAbandonedNodeEnforcement(false);
+
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("must set the table reference of a BigQueryIO.Write 
transform");
-    TestPipeline.create()
+    p
         .apply(Create.<TableRow>of())
         .apply("name", BigQueryIO.Write.withoutValidation());
   }
@@ -1950,7 +1957,6 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testPassThroughThenCleanup() throws Exception {
-    Pipeline p = TestPipeline.create();
 
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3))
@@ -1968,7 +1974,6 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testPassThroughThenCleanupExecuted() throws Exception {
-    Pipeline p = TestPipeline.create();
 
     p.apply(Create.<Integer>of())
         .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
@@ -2025,6 +2030,8 @@ public class BigQueryIOTest implements Serializable {
 
   private void testWritePartition(long numFiles, long fileSize, long 
expectedNumPartitions)
       throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
     List<Long> expectedPartitionIds = Lists.newArrayList();
     for (long i = 1; i <= expectedNumPartitions; ++i) {
       expectedPartitionIds.add(i);
@@ -2044,7 +2051,7 @@ public class BigQueryIOTest implements Serializable {
         new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
 
     PCollectionView<Iterable<KV<String, Long>>> filesView = 
PCollectionViews.iterableView(
-        TestPipeline.create(),
+        p,
         WindowingStrategy.globalDefault(),
         KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
 
@@ -2164,6 +2171,8 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteRename() throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done")
@@ -2179,7 +2188,7 @@ public class BigQueryIOTest implements Serializable {
     }
 
     PCollectionView<Iterable<String>> tempTablesView = 
PCollectionViews.iterableView(
-        TestPipeline.create(),
+        p,
         WindowingStrategy.globalDefault(),
         StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6dea0992/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 98215df..e3d1f47 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -106,6 +106,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class BigtableIOTest {
+  @Rule public final transient TestPipeline p = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
 
@@ -140,7 +141,7 @@ public class BigtableIOTest {
     service = new FakeBigtableService();
     defaultRead = defaultRead.withBigtableService(service);
     defaultWrite = defaultWrite.withBigtableService(service);
-    bigtableCoder = 
TestPipeline.create().getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
+    bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
   }
 
   @Test
@@ -261,6 +262,8 @@ public class BigtableIOTest {
   /** Tests that when reading from a non-existent table, the read fails. */
   @Test
   public void testReadingFailsTableDoesNotExist() throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
     final String table = "TEST-TABLE";
 
     BigtableIO.Read read =
@@ -273,7 +276,7 @@ public class BigtableIOTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format("Table %s does not exist", table));
 
-    TestPipeline.create().apply(read);
+    p.apply(read);
   }
 
   /** Tests that when reading from an empty table, the read succeeds. */
@@ -589,7 +592,6 @@ public class BigtableIOTest {
 
     service.createTable(table);
 
-    TestPipeline p = TestPipeline.create();
     p.apply("single row", Create.of(makeWrite(key, 
value)).withCoder(bigtableCoder))
         .apply("write", defaultWrite.withTableId(table));
     p.run();
@@ -606,10 +608,12 @@ public class BigtableIOTest {
   /** Tests that when writing to a non-existent table, the write fails. */
   @Test
   public void testWritingFailsTableDoesNotExist() throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
     final String table = "TEST-TABLE";
 
     PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
-        TestPipeline.create().apply(Create.<KV<ByteString, 
Iterable<Mutation>>>of());
+        p.apply(Create.<KV<ByteString, Iterable<Mutation>>>of());
 
     // Exception will be thrown by write.validate() when write is applied.
     thrown.expect(IllegalArgumentException.class);
@@ -625,7 +629,6 @@ public class BigtableIOTest {
     final String key = "KEY";
     service.createTable(table);
 
-    TestPipeline p = TestPipeline.create();
     p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
         .apply(defaultWrite.withTableId(table));
 

Reply via email to