[BEAM-13] Add JmsIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2cf75568 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2cf75568 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2cf75568 Branch: refs/heads/master Commit: 2cf755686f2518ed9575ec5c087884be0e6ea678 Parents: 76928d3 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Thu May 5 19:14:37 2016 +0200 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Jul 27 13:49:28 2016 -0700 ---------------------------------------------------------------------- sdks/java/io/jms/pom.xml | 136 +++++ .../beam/sdk/io/jms/JmsCheckpointMark.java | 82 +++ .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 518 +++++++++++++++++++ .../org/apache/beam/sdk/io/jms/JmsRecord.java | 153 ++++++ .../apache/beam/sdk/io/jms/package-info.java | 22 + .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 145 ++++++ sdks/java/io/pom.xml | 1 + 7 files changed, 1057 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml new file mode 100644 index 0000000..e0e0f36 --- /dev/null +++ b/sdks/java/io/jms/pom.xml @@ -0,0 +1,136 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>jms</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: JMS</name> + <description>IO to read and write to JMS (Java Messaging Service) + destinations (queues and topics). </description> + + <properties> + <activemq.version>5.13.1</activemq.version> + <geronimo-jms.version>1.1.1</geronimo-jms.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <version>${geronimo-jms.version}</version> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java new file mode 100644 index 0000000..81c2b82 --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Message; + +/** + * Checkpoint for an unbounded JmsIO.Read. Consists of + * JMS destination name, and the latest message ID consumed so far. + */ +@DefaultCoder(AvroCoder.class) +public class JmsCheckpointMark implements UnboundedSource.CheckpointMark { + + private final List<Message> messages = new ArrayList<>(); + private Instant oldestPendingTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + + public JmsCheckpointMark() { + } + + protected List<Message> getMessages() { + return this.messages; + } + + protected void addMessage(Message message) throws Exception { + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + if (currentMessageTimestamp.isBefore(oldestPendingTimestamp)) { + oldestPendingTimestamp = currentMessageTimestamp; + } + messages.add(message); + } + + protected Instant getOldestPendingTimestamp() { + return oldestPendingTimestamp; + } + + /** + * Acknowledge all outstanding message. Since we believe that messages will be delivered in + * timestamp order, and acknowledged messages will not be retried, the newest message in this + * batch is a good bound for future messages. + */ + @Override + public void finalizeCheckpoint() { + for (Message message : messages) { + try { + message.acknowledge(); + Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); + if (currentMessageTimestamp.isAfter(oldestPendingTimestamp)) { + oldestPendingTimestamp = currentMessageTimestamp; + } + } catch (Exception e) { + // nothing to do + } + } + messages.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java new file mode 100644 index 0000000..2de933c --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; + +import com.google.common.annotations.VisibleForTesting; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * An unbounded source for JMS destinations (queues or topics). + * + * <h3>Reading from a JMS destination</h3> + * + * <p>JmsIO source returns unbounded collection of JMS records as {@code PCollection<JmsRecord<T>>}. + * A {@link JmsRecord} includes JMS headers and properties, along with the JMS message payload.</p> + * + * <p>To configure a JMS source, you have to provide a {@link javax.jms.ConnectionFactory} + * and the destination (queue or topic) where to consume. The following example + * illustrates various options for configuring the source:</p> + * + * <pre>{@code + * + * pipeline.apply(JmsIO.read() + * .withConnectionFactory(myConnectionFactory) + * .withQueue("my-queue") + * // above two are required configuration, returns PCollection<JmsRecord<byte[]>> + * + * // rest of the settings are optional + * + * }</pre> + * + * <h3>Writing to a JMS destination</h3> + * + * JmsIO sink supports writing text messages to a JMS destination on a broker. + * To configure a JMS sink, you must specify a {@link javax.jms.ConnectionFactory} and a + * {@link javax.jms.Destination} name. + * For instance: + * + * <pre>{@code + * + * pipeline + * .apply(...) // returns PCollection<String> + * .apply(JmsIO.write() + * .withConnectionFactory(myConnectionFactory) + * .withQueue("my-queue") + * + * }</pre> + */ +public class JmsIO { + + private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); + + public static Read read() { + return new Read(); + } + + public static Write write() { + return new Write(); + } + + /** + * A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more + * information on usage and configuration. + */ + public static class Read extends PTransform<PBegin, PCollection<JmsRecord>> { + + public Read withConnectionFactory(ConnectionFactory connectionFactory) { + return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + } + + public Read withQueue(String queue) { + return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + } + + public Read withTopic(String topic) { + return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + } + + public Read withMaxNumRecords(long maxNumRecords) { + return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + } + + public Read withMaxReadTime(Duration maxReadTime) { + return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + } + + @Override + public PCollection<JmsRecord> apply(PBegin input) { + // handles unbounded source to bounded conversion if maxNumRecords is set. + Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource()); + + PTransform<PInput, PCollection<JmsRecord>> transform = unbounded; + + if (maxNumRecords != Long.MAX_VALUE) { + transform = unbounded.withMaxNumRecords(maxNumRecords); + } else if (maxReadTime != null) { + transform = unbounded.withMaxReadTime(maxReadTime); + } + + return input.getPipeline().apply(transform); + } + + @Override + public void validate(PBegin input) { + checkNotNull(connectionFactory, "ConnectionFactory not specified"); + checkArgument((queue != null || topic != null), "Either queue or topic not specified"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder.addIfNotNull(DisplayData.item("queue", queue)); + builder.addIfNotNull(DisplayData.item("topic", topic)); + + } + + /////////////////////////////////////////////////////////////////////////////////////// + + /** + * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html + * "It is expected that JMS providers will provide the tools an administrator needs to create + * and configure administered objects in a JNDI namespace. JMS provider implementations of + * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so + * that they can be stored in all JNDI naming contexts. In addition, it is recommended that + * these implementations follow the JavaBeansTM design patterns." + * + * So, a {@link ConnectionFactory} implementation is serializable. + */ + protected ConnectionFactory connectionFactory; + @Nullable + protected String queue; + @Nullable + protected String topic; + protected long maxNumRecords; + protected Duration maxReadTime; + + private Read() {} + + private Read( + ConnectionFactory connectionFactory, + String queue, + String topic, + long maxNumRecords, + Duration maxReadTime) { + super("JmsIO.Read"); + + this.connectionFactory = connectionFactory; + this.queue = queue; + this.topic = topic; + this.maxNumRecords = maxNumRecords; + this.maxReadTime = maxReadTime; + } + + /** + * Creates an {@link UnboundedSource<JmsRecord, ?>} with the configuration in + * {@link Read}. Primary use case is unit tests, should not be used in an + * application. + */ + @VisibleForTesting + UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() { + return new UnboundedJmsSource( + connectionFactory, + queue, + topic); + } + + } + + private JmsIO() {} + + private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> { + + private final ConnectionFactory connectionFactory; + private final String queue; + private final String topic; + + public UnboundedJmsSource( + ConnectionFactory connectionFactory, + String queue, + String topic) { + this.connectionFactory = connectionFactory; + this.queue = queue; + this.topic = topic; + } + + @Override + public List<UnboundedJmsSource> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + List<UnboundedJmsSource> sources = new ArrayList<>(); + for (int i = 0; i < desiredNumSplits; i++) { + sources.add(new UnboundedJmsSource(connectionFactory, queue, topic)); + } + return sources; + } + + @Override + public UnboundedJmsReader createReader(PipelineOptions options, + JmsCheckpointMark checkpointMark) { + return new UnboundedJmsReader(this, checkpointMark); + } + + @Override + public void validate() { + checkNotNull(connectionFactory, "ConnectionFactory is not defined"); + checkArgument((queue != null || topic != null), "Either queue or topic is not defined"); + } + + @Override + public Coder getCheckpointMarkCoder() { + return AvroCoder.of(JmsCheckpointMark.class); + } + + @Override + public Coder<JmsRecord> getDefaultOutputCoder() { + return SerializableCoder.of(JmsRecord.class); + } + + } + + private static class UnboundedJmsReader extends UnboundedReader<JmsRecord> { + + private UnboundedJmsSource source; + private JmsCheckpointMark checkpointMark; + private Connection connection; + private Session session; + private MessageConsumer consumer; + + private JmsRecord currentRecord; + private Instant currentTimestamp; + + public UnboundedJmsReader( + UnboundedJmsSource source, + JmsCheckpointMark checkpointMark) { + this.source = source; + if (checkpointMark != null) { + this.checkpointMark = checkpointMark; + } else { + this.checkpointMark = new JmsCheckpointMark(); + } + this.currentRecord = null; + } + + @Override + public boolean start() throws IOException { + ConnectionFactory connectionFactory = source.connectionFactory; + try { + this.connection = connectionFactory.createConnection(); + this.connection.start(); + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + if (source.topic != null) { + this.consumer = this.session.createConsumer(this.session.createTopic(source.topic)); + } else { + this.consumer = this.session.createConsumer(this.session.createQueue(source.queue)); + } + + return advance(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public boolean advance() throws IOException { + try { + TextMessage message = (TextMessage) this.consumer.receiveNoWait(); + + if (message == null) { + currentRecord = null; + return false; + } + + Map<String, Object> properties = new HashMap<>(); + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String propertyName = (String) propertyNames.nextElement(); + properties.put(propertyName, message.getObjectProperty(propertyName)); + } + + JmsRecord jmsRecord = new JmsRecord( + message.getJMSMessageID(), + message.getJMSTimestamp(), + message.getJMSCorrelationID(), + message.getJMSReplyTo(), + message.getJMSDestination(), + message.getJMSDeliveryMode(), + message.getJMSRedelivered(), + message.getJMSType(), + message.getJMSExpiration(), + message.getJMSPriority(), + properties, + message.getText()); + + checkpointMark.addMessage(message); + + currentRecord = jmsRecord; + currentTimestamp = new Instant(message.getJMSTimestamp()); + + return true; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public JmsRecord getCurrent() throws NoSuchElementException { + if (currentRecord == null) { + throw new NoSuchElementException(); + } + return currentRecord; + } + + @Override + public Instant getWatermark() { + return checkpointMark.getOldestPendingTimestamp(); + } + + @Override + public Instant getCurrentTimestamp() { + if (currentRecord == null) { + throw new NoSuchElementException(); + } + return currentTimestamp; + } + + @Override + public CheckpointMark getCheckpointMark() { + return checkpointMark; + } + + @Override + public UnboundedSource<JmsRecord, ?> getCurrentSource() { + return source; + } + + @Override + public void close() throws IOException { + try { + if (consumer != null) { + consumer.close(); + consumer = null; + } + if (session != null) { + session.close(); + session = null; + } + if (connection != null) { + connection.stop(); + connection.close(); + connection = null; + } + } catch (Exception e) { + throw new IOException(e); + } + } + + } + + /** + * A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for + * more information on usage and configuration. + */ + public static class Write extends PTransform<PCollection<String>, PDone> { + + protected ConnectionFactory connectionFactory; + protected String queue; + protected String topic; + + public Write withConnectionFactory(ConnectionFactory connectionFactory) { + return new Write(connectionFactory, queue, topic); + } + + public Write withQueue(String queue) { + return new Write(connectionFactory, queue, topic); + } + + public Write withTopic(String topic) { + return new Write(connectionFactory, queue, topic); + } + + private Write() {} + + private Write(ConnectionFactory connectionFactory, String queue, String topic) { + this.connectionFactory = connectionFactory; + this.queue = queue; + this.topic = topic; + } + + @Override + public PDone apply(PCollection<String> input) { + input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection<String> input) { + checkNotNull(connectionFactory, "ConnectionFactory is not defined"); + checkArgument((queue != null || topic != null), "Either queue or topic is required"); + } + + private static class JmsWriter extends DoFn<String, Void> { + + private ConnectionFactory connectionFactory; + private String queue; + private String topic; + + private Connection connection; + private Session session; + private MessageProducer producer; + + public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) { + this.connectionFactory = connectionFactory; + this.queue = queue; + this.topic = topic; + } + + @Override + public void startBundle(Context c) throws Exception { + if (producer == null) { + this.connection = connectionFactory.createConnection(); + this.connection.start(); + /** + * false means we don't use JMS transaction. + */ + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination; + if (queue != null) { + destination = session.createQueue(queue); + } else { + destination = session.createTopic(topic); + } + this.producer = this.session.createProducer(destination); + } + } + + @Override + public void processElement(ProcessContext ctx) throws Exception { + String value = ctx.element(); + + try { + TextMessage message = session.createTextMessage(value); + producer.send(message); + } catch (Exception t) { + finishBundle(null); + throw t; + } + } + + @Override + public void finishBundle(Context c) throws Exception { + producer.close(); + producer = null; + session.close(); + session = null; + connection.stop(); + connection.close(); + connection = null; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java new file mode 100644 index 0000000..aa0c472 --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +import javax.jms.Destination; + +/** + * JmsRecord contains message payload of the record + * as well as metadata (JMS headers and properties). + */ +public class JmsRecord implements Serializable { + + private final String jmsMessageID; + private final long jmsTimestamp; + private final String jmsCorrelationID; + private final Destination jmsReplyTo; + private final Destination jmsDestination; + private final int jmsDeliveryMode; + private final boolean jmsRedelivered; + private final String jmsType; + private final long jmsExpiration; + private final int jmsPriority; + private final Map<String, Object> properties; + private final String text; + + public JmsRecord( + String jmsMessageID, + long jmsTimestamp, + String jmsCorrelationID, + Destination jmsReplyTo, + Destination jmsDestination, + int jmsDeliveryMode, + boolean jmsRedelivered, + String jmsType, + long jmsExpiration, + int jmsPriority, + Map<String, Object> properties, + String text) { + this.jmsMessageID = jmsMessageID; + this.jmsTimestamp = jmsTimestamp; + this.jmsCorrelationID = jmsCorrelationID; + this.jmsReplyTo = jmsReplyTo; + this.jmsDestination = jmsDestination; + this.jmsDeliveryMode = jmsDeliveryMode; + this.jmsRedelivered = jmsRedelivered; + this.jmsType = jmsType; + this.jmsExpiration = jmsExpiration; + this.jmsPriority = jmsPriority; + this.properties = properties; + this.text = text; + } + + public String getJmsMessageID() { + return jmsMessageID; + } + + public long getJmsTimestamp() { + return jmsTimestamp; + } + + public String getJmsCorrelationID() { + return jmsCorrelationID; + } + + public Destination getJmsReplyTo() { + return jmsReplyTo; + } + + public Destination getJmsDestination() { + return jmsDestination; + } + + public int getJmsDeliveryMode() { + return jmsDeliveryMode; + } + + public boolean getJmsRedelivered() { + return jmsRedelivered; + } + + public String getJmsType() { + return jmsType; + } + + public long getJmsExpiration() { + return jmsExpiration; + } + + public int getJmsPriority() { + return jmsPriority; + } + + public Map<String, Object> getProperties() { + return this.properties; + } + + public String getPayload() { + return this.text; + } + + @Override + public int hashCode() { + return Objects.hash(jmsMessageID, + jmsTimestamp, + jmsCorrelationID, + jmsReplyTo, + jmsDestination, + jmsDeliveryMode, + jmsRedelivered, + jmsType, + jmsExpiration, + jmsPriority, + properties, + text); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof JmsRecord) { + JmsRecord other = (JmsRecord) obj; + return jmsMessageID.equals(other.jmsMessageID) + && jmsDestination.equals(other.jmsDestination) + && jmsDeliveryMode == other.jmsDeliveryMode + && jmsRedelivered == other.jmsRedelivered + && jmsExpiration == other.jmsExpiration + && jmsPriority == other.jmsPriority + && properties.equals(other.properties) + && text.equals(other.text); + } else { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java new file mode 100644 index 0000000..3845b07 --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from JMS (Java Messaging Service). + */ +package org.apache.beam.sdk.io.jms; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java new file mode 100644 index 0000000..020794c --- /dev/null +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Tests of {@link JmsIO}. + */ +@RunWith(JUnit4.class) +public class JmsIOTest { + + + private static final String BROKER_URL = "vm://localhost"; + + private BrokerService broker; + private ConnectionFactory connectionFactory; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker.addConnector(BROKER_URL); + broker.setBrokerName("localhost"); + broker.start(); + + // create JMS connection factory + connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + } + + @Test + @Category(NeedsRunner.class) + public void testReadMessages() throws Exception { + + // produce message + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("test")); + TextMessage message = session.createTextMessage("This Is A Test"); + producer.send(message); + producer.send(message); + producer.send(message); + producer.send(message); + producer.send(message); + producer.send(message); + producer.close(); + session.close(); + connection.close(); + + Pipeline pipeline = TestPipeline.create(); + + // read from the queue + PCollection<JmsRecord> output = pipeline.apply( + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withQueue("test") + .withMaxNumRecords(5)); + + PAssert + .thatSingleton(output.apply("Count", Count.<JmsRecord>globally())) + .isEqualTo(new Long(5)); + pipeline.run(); + + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + Message msg = consumer.receiveNoWait(); + Assert.assertNull(msg); + } + + @Test + @Category(NeedsRunner.class) + public void testWriteMessage() throws Exception { + + Pipeline pipeline = TestPipeline.create(); + + ArrayList<String> data = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + data.add("Message " + i); + } + pipeline.apply(Create.of(data)) + .apply(JmsIO.write().withConnectionFactory(connectionFactory).withQueue("test")); + + pipeline.run(); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + int count = 0; + while (consumer.receive(1000) != null) { + count++; + } + Assert.assertEquals(100, count); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cf75568/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index e55f08a..fce1f65 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -35,6 +35,7 @@ <modules> <module>google-cloud-platform</module> <module>hdfs</module> + <module>jms</module> <module>kafka</module> </modules>