Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d478c0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d478c0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d478c0f Branch: refs/heads/python-sdk Commit: 8d478c0f38c656d3533d590a65c6ed95da229f81 Parents: 5ccbe67 Author: Stas Levin <stasle...@gmail.com> Authored: Tue Dec 20 17:31:23 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:45 2016 -0800 ---------------------------------------------------------------------- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d478c0f/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 4c3be6d..7259ce8 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -28,7 +28,6 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -55,6 +55,9 @@ public class JmsIOTest { private BrokerService broker; private ConnectionFactory connectionFactory; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -92,8 +95,6 @@ public class JmsIOTest { session.close(); connection.close(); - Pipeline pipeline = TestPipeline.create(); - // read from the queue PCollection<JmsRecord> output = pipeline.apply( JmsIO.read() @@ -117,8 +118,6 @@ public class JmsIOTest { @Category(NeedsRunner.class) public void testWriteMessage() throws Exception { - Pipeline pipeline = TestPipeline.create(); - ArrayList<String> data = new ArrayList<>(); for (int i = 0; i < 100; i++) { data.add("Message " + i);