This is an automated email from the ASF dual-hosted git repository. samaitra pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push: new 1f4cd8c IGNITE-12364 Migrate JMS module to ignite-extensions - Fixes #20. 1f4cd8c is described below commit 1f4cd8c38deb2e5ccf5a33985eb3f2b9e3c00704 Author: samaitra <saikat.mai...@gmail.com> AuthorDate: Mon Aug 31 19:50:44 2020 -0500 IGNITE-12364 Migrate JMS module to ignite-extensions - Fixes #20. Signed-off-by: samaitra <saikat.mai...@gmail.com> --- modules/jms11-ext/README.txt | 29 + modules/jms11-ext/licenses/apache-2.0.txt | 202 ++++++ .../modules/core/src/test/config/log4j-test.xml | 97 +++ .../modules/core/src/test/config/tests.properties | 153 +++++ modules/jms11-ext/pom.xml | 104 +++ .../apache/ignite/stream/jms11/JmsStreamer.java | 565 ++++++++++++++++ .../ignite/stream/jms11/MessageTransformer.java | 41 ++ .../apache/ignite/stream/jms11/package-info.java | 22 + .../ignite/stream/jms11/IgniteJmsStreamerTest.java | 708 +++++++++++++++++++++ .../stream/jms11/IgniteJmsStreamerTestSuite.java | 34 + .../ignite/stream/jms11/TestTransformers.java | 127 ++++ .../apache/ignite/stream/jms11/package-info.java | 22 + pom.xml | 1 + 13 files changed, 2105 insertions(+) diff --git a/modules/jms11-ext/README.txt b/modules/jms11-ext/README.txt new file mode 100644 index 0000000..0b61d55 --- /dev/null +++ b/modules/jms11-ext/README.txt @@ -0,0 +1,29 @@ +Apache Ignite JMS 1.1 Module +---------------------------- + +Apache Ignite JMS 1.1 module provides a streamer to consume JMS queue and topic messages into +Apache Ignite caches. + +Importing Apache Ignite JMS 1.1 Module In Maven Project +-------------------------------------------------------- + +If you are using Maven to manage dependencies of your project, you can add the JMS 1.1 module +dependency like this (replace '${ignite.version}' with actual Ignite version you are +interested in): + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-jms11-ext</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> diff --git a/modules/jms11-ext/licenses/apache-2.0.txt b/modules/jms11-ext/licenses/apache-2.0.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/modules/jms11-ext/licenses/apache-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/modules/jms11-ext/modules/core/src/test/config/log4j-test.xml b/modules/jms11-ext/modules/core/src/test/config/log4j-test.xml new file mode 100755 index 0000000..b78fa9c --- /dev/null +++ b/modules/jms11-ext/modules/core/src/test/config/log4j-test.xml @@ -0,0 +1,97 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" + "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd"> +<!-- + Log4j configuration. +--> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false"> + <!-- + Logs System.out messages to console. + --> + <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender"> + <!-- Log to STDOUT. --> + <param name="Target" value="System.out"/> + + <!-- Log from DEBUG and higher. --> + <param name="Threshold" value="DEBUG"/> + + <!-- The default pattern: Date Priority [Category] Message\n --> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/> + </layout> + + <!-- Do not log beyond INFO level. --> + <filter class="org.apache.log4j.varia.LevelRangeFilter"> + <param name="levelMin" value="DEBUG"/> + <param name="levelMax" value="INFO"/> + </filter> + </appender> + + <!-- + Logs all System.err messages to console. + --> + <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender"> + <!-- Log to STDERR. --> + <param name="Target" value="System.err"/> + + <!-- Log from WARN and higher. --> + <param name="Threshold" value="WARN"/> + + <!-- The default pattern: Date Priority [Category] Message\n --> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/> + </layout> + </appender> + + <!-- + Logs all output to specified file. + --> + <appender name="FILE" class="org.apache.log4j.RollingFileAppender"> + <param name="Threshold" value="DEBUG"/> + <param name="File" value="${IGNITE_HOME}/work/log/ignite.log"/> + <param name="Append" value="true"/> + <param name="MaxFileSize" value="10MB"/> + <param name="MaxBackupIndex" value="10"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/> + </layout> + </appender> + + <!-- Disable all open source debugging. --> + <category name="org"> + <level value="INFO"/> + </category> + + <category name="org.eclipse.jetty"> + <level value="INFO"/> + </category> + + <!-- Default settings. --> + <root> + <!-- Print at info by default. --> + <level value="INFO"/> + + <!-- Append to file and console. --> + <appender-ref ref="FILE"/> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="CONSOLE_ERR"/> + </root> +</log4j:configuration> diff --git a/modules/jms11-ext/modules/core/src/test/config/tests.properties b/modules/jms11-ext/modules/core/src/test/config/tests.properties new file mode 100644 index 0000000..18b3606 --- /dev/null +++ b/modules/jms11-ext/modules/core/src/test/config/tests.properties @@ -0,0 +1,153 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Local address to bind to. +local.ip=127.0.0.1 + +# TCP communication port +comm.tcp.port=30010 + +# JBoss JNDI +# JBoss context factory for JNDI connection establishing. +jboss.jndi.context.factory=org.jnp.interfaces.NamingContextFactory +# JBoss specific parameter for JNDI connection establishing. +jboss.jndi.pkg.prefixes=org.jboss.naming:org.jnp.interfaces +# URL of JBoss server for the 1st node. +jboss.jndi.node1.provider.url=jnp://localhost:1199 +# URL of JBoss server for the 2nd node. +jboss.jndi.node2.provider.url=jnp://localhost:1299 +# JBoss Discovery test max wait time. +jboss.disco.test.wait=180000 + +# Deployment configuration paths. +# You will either need to override deploy.uri.dir or supply CLASSES_URI as system property. +# +# Path to keystore with private and public keys. +deploy.uri.secure.keystore=@{IGNITE_HOME}/modules/tests/config/securedeploy/keystore +# Temporary dir where deployment unit stored before deploy. +deploy.uri.tmpdir=${java.io.tmpdir}/gg +# Deployment dir for file scanner test with different types of GAR's. +deploy.uri.file2.path=${java.io.tmpdir}/gg/verification/ +# URI string. +deploy.uri.file2=file://freq=200@localhost/${java.io.tmpdir}/gg/verification/ +# File scanner URI for local file deployment. +deploy.uri.file=file://localhost/@{IGNITE_HOME}/modules/extdata/uri/target/file/ +# FTP scanner URI for FTP deployment. +deploy.uri.ftp=ftp://ftptest:iddqd@94.72.60.102:21/test/deployment +# Classes scanner URI for classes deployment. Must be overridden for every user. +deploy.uri.cls=${CLASSES_URI} +# Http scanner URI for HTTP deployment. +deploy.uri.http=http://fake.uri +# Http scanner URI for secure SSL HTTPs deployment. +deploy.uri.https=https://fake.uri +# Directory with descriptors to construct GAR files. +deploy.gar.descriptor.dir=modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/META-INF + +# Directory with a number of descriptors for the Ant gar task. +ant.gar.descriptor.dir=modules/extdata/p2p/META-INF +# Temporary directory for the Ant task resulting GAR file. +ant.gar.tmpdir=${java.io.tmpdir}/gg +# The same as p2p.uri.cls but without protocol +ant.gar.srcdir=@{IGNITE_HOME}/modules/extdata/uri/target/classes/ + +# Paths to use in URI deployment SPI tests +urideployment.jar.uri=modules/extdata/uri/target/deploy/uri.jar +urideployment.path.tmp=modules/extdata/uri/target/deploy_tmp/ + +# GAR paths to use in URI deployment SPI tests +ant.urideployment.gar.uri=file://freq=5000@localhost/EXTDATA/uri/target/deploy +ant.urideployment.gar.file=modules/extdata/uri/target/deploy/uri.gar +ant.urideployment.gar.libs-file=modules/extdata/uri/target/deploy2/uri-libs.gar +ant.urideployment.gar.classes-file=modules/extdata/uri/target/deploy2/uri-classes.gar +ant.urideployment.gar.path=modules/extdata/uri/target/deploy/ + +# Classpath directory for GridP2PUserVersionChangeSelfTest +ant.userversion.class.dir=@{IGNITE_HOME}/modules/tests/java/ + +# Multicast discovery self test. +discovery.mbeanserver.selftest.baseport=50000 + +# TCP communication self test. +comm.mbeanserver.selftest.baseport=50100 + +# Kernel tests. +grid.comm.selftest.sender.timeout=1000 +grid.comm.selftest.timeout=10000 + +#P2P tests +#Overwrite this property. It should point to P2P module compilation directory. +p2p.uri.cls=file://localhost/@{IGNITE_HOME}/modules/extdata/p2p/target/classes/ +p2p.uri.cls.second=file://localhost/@{IGNITE_HOME}/modules/extdata/uri/target/classes/ + +# AOP tests. +# Connector port for RMI. +connector.rmi.port=7657 +# Connector port for XFire Web Service. +connector.ws.port=9090 + +# Load test duration in minutes. +load.test.duration=500 +load.test.threadnum=50 +load.test.nodenum=5 + +# Loaders tests +loader.self.test.config=modules/core/src/test/config/loaders/grid-cfg.xml +loader.self.multipletest.config=modules/core/src/test/config/loaders/grid-cfg-2-grids.xml +loader.self.test.jboss.config=modules/core/src/test/config/loaders/grid-cfg.xml + +# WebSphere jmx properties +websphere.jmx.connector.host=localhost +websphere.jmx.connector.port=8880 +websphere.jmx.connector.security=false +websphere.jmx.username= +websphere.jmx.pwd= + +# GlassFish jmx properties for GlassFish Loader +glassfish.jmx.rmi.connector.port=8686 +glassfish.jmx.username=admin +glassfish.jmx.password=adminadmin + +# Tomcat jmx properties for Servlet Loader +tomcat.jmx.rmi.connector.port=1097 + +# Marshaller for tests +#marshaller.class=org.apache.ignite.marshaller.jdk.GridJdkMarshaller + +# EC2 configuration for tests +#amazon.access.key= +#amazon.secret.key= + +# SSH config. +ssh.username=uname +ssh.password=passwd + +# SSL tests keystore. +ssl.keystore.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/server.jks +ssl.keystore.password=123456 + +# node01 signed with trust-one, node02 and node03 by trust-two, node02old is expired +# trust-both contains both CAs +ssl.keystore.node01.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node01.jks +ssl.keystore.node02.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node02.jks +ssl.keystore.node03.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node03.jks +ssl.keystore.trustone.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-one.jks +ssl.keystore.trusttwo.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-two.jks +ssl.keystore.trustboth.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/trust-both.jks +ssl.keystore.node02old.path=@{IGNITE_HOME}/modules/clients/src/test/keystore/ca/node02old.jks + +# Hadoop home directory. +hadoop.home=@{HADOOP_HOME} diff --git a/modules/jms11-ext/pom.xml b/modules/jms11-ext/pom.xml new file mode 100644 index 0000000..42b7ac3 --- /dev/null +++ b/modules/jms11-ext/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-extensions-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-jms11-ext</artifactId> + <version>1.0.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <version>${jms.spec.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${ignite.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${ignite.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-tools</artifactId> + <version>${ignite.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Generate the OSGi MANIFEST.MF for this bundle. --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> diff --git a/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java b/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java new file mode 100644 index 0000000..6e0ce64 --- /dev/null +++ b/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.stream.StreamAdapter; + +/** + * Streamer that consumes from a JMS destination and feeds key-value pairs into an {@link IgniteDataStreamer} instance. + * <p> + * This Streamer uses purely JMS semantics and it is not coupled with any JMS implementation. It uses {@link + * MessageListener} to receive messages. You must provide your broker's {@link javax.jms.ConnectionFactory} when + * creating a {@link JmsStreamer}. + * <p> + * You must also provide a {@link MessageTransformer} to convert the incoming message into cache entries. + * <p> + * This Streamer has many features: + * + * <ul> + * <li>Consumes from queues or topics.</li> + * <li>For topics, it supports durable subscriptions.</li> + * <li>Concurrent consumers are supported via the <tt>threads</tt> parameter. When consuming from queues, + * this component will start as many {@link Session} objects with separate {@link MessageListener} instances each, + * therefore achieving <i>native</i> concurrency (in terms of the JMS standard).<br> + * When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume + * duplicate messages. Therefore, we achieve concurrency in a <i>virtualized</i> manner through an internal + * thread pool.</li> + * <li>Transacted sessions are supported through the <tt>transacted</tt> parameter.</li> + * <li>Batched consumption is possible via the <tt>batched</tt> parameter. Depending on the broker, this + * technique can provide a higher throughput as it decreases the amount of message acknowledgement round trips + * that are necessary, albeit at the expense possible duplicate messages (especially if an incident + * occurs in the middle of a transaction).<br> + * Batches are committed when the <tt>batchClosureMillis</tt> time has elapsed, or when a Session has received + * at least <tt>batchClosureSize</tt> messages. Time-based closure fires with the specified frequency and applies to + * all {@link Session}s in parallel. Size-based closure applies individually to each <tt>Session</tt> (as transactions + * are <tt>Session-bound</tt> in JMS, so it will fire when that {@link Session} has processed that many messages. + * Both options are compatible with each other, or you can disable either (see setter documentation), + * but not both.</li> + * <li>Can specify the destination with implementation-specific {@link Destination} objects or with names.</li> + * </ul> + + * + * @author Raul Kripalani + */ +public class JmsStreamer<T extends Message, K, V> extends StreamAdapter<T, K, V> { + + /** Logger. */ + private IgniteLogger log; + + /** + * <i>Compulsory.</i> The message transformer that converts an incoming JMS {@link Message} (or subclass) into one + * or multiple cache entries. + */ + private MessageTransformer<T, K, V> transformer; + + /** The JMS {@link ConnectionFactory} to use. */ + private ConnectionFactory connectionFactory; + + /** Whether to register or not as a durable subscription (for topic consumption). */ + private boolean durableSubscription; + + /** Name of the durable subscription, as required by the JMS specification. */ + private String durableSubscriptionName; + + /** Client ID in case we're using durable subscribers. */ + private String clientId; + + /** The JMS {@link Destination}; takes precedence over destinationName if both are set. */ + private Destination destination; + + /** Name of the destination. */ + private String destinationName; + + /** Whether to consume in a transacted manner. */ + private boolean transacted; + + /** Whether to consume messages in batches. May lead to duplicate consumption. Value <tt>true</tt> implies + * <tt>transacted = true</tt>. */ + private boolean batched; + + /** When using batched consumers, the amount of messages after the batch (transaction) will be committed. */ + private int batchClosureSize = 50; + + /** + * When using batched consumers, the amount of time to wait before the batch (transaction) will be committed. A + * value of 0 or -1 disables timed-based session commits. + */ + private long batchClosureMillis = 1000; + + /** Destination type. */ + private Class<? extends Destination> destinationType = Queue.class; + + /** + * Number of threads to concurrently consume JMS messages. When working with queues, we will start as many {@link + * javax.jms.Session} objects as indicated by this field, i.e. you will get native concurrency. On the other hand, + * when consuming from a topic, for obvious reason we will only start 1 message consumer but we will distribute the + * processing of received messages to as many concurrent threads as indicated. + */ + private int threads = 1; + + /** Whether we are stopped or not. */ + private volatile boolean stopped = true; + + /** JMS Connection. */ + private Connection connection; + + /** Stores the current JMS Sessions. */ + private Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()); + + /** Message consumers. */ + private Set<MessageConsumer> consumers = Collections.newSetFromMap(new ConcurrentHashMap<MessageConsumer, Boolean>()); + + /** Message listeners. */ + private Set<IgniteJmsMessageListener> listeners = Collections.newSetFromMap(new ConcurrentHashMap<IgniteJmsMessageListener, Boolean>()); + + /** The Jms {@link ExceptionListener} to use. */ + private ExceptionListener exceptionListener; + + /** Scheduler for handling {@link #batchClosureMillis}. */ + private ScheduledExecutorService scheduler; + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() throws IgniteException { + if (!stopped) + throw new IgniteException("Attempted to start an already started JMS Streamer"); + + try { + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + + log = getIgnite().log(); + + A.notNull(transformer, "message transformer"); + A.notNull(connectionFactory, "connection factory"); + A.ensure(threads > 0, "threads > 0"); + + // handle batched && transacted parameter interaction + if (batched && !transacted) { + log.warning("Starting a Batched JMS Streamer without transacted flag = true. Setting it automatically."); + transacted = true; + } + + // handle batch completion criteria + if (batched) { + A.ensure(batchClosureMillis > 0 || batchClosureSize > 0, "at least one of batch closure size or " + + "batch closure frequency must be specified when using batch consumption"); + } + + // check the parameters needed for durable subscriptions, if enabled + if (durableSubscription) { + A.notNullOrEmpty(clientId, "client id is compulsory when using durable subscriptions"); + A.notNullOrEmpty(durableSubscriptionName, "durable subscription name is compulsory when using " + + "durable subscriptions"); + } + + // validate the destination; if we have an explicit destination, make sure it's of type Queue or Topic; + // else make sure that the destinationName and the destinationType are valid + if (destination == null) { + A.notNull(destinationType, "destination type"); + A.ensure(destinationType.isAssignableFrom(Queue.class) || destinationType.isAssignableFrom(Topic.class), + "this streamer can only handle Queues or Topics."); + A.notNullOrEmpty(destinationName, "destination or destination name"); + } + else if (destination instanceof Queue) { + destinationType = Queue.class; + } + else if (destination instanceof Topic) { + destinationType = Topic.class; + } + else { + throw new IllegalArgumentException("Invalid destination object. Can only handle Queues or Topics."); + } + + // create a new connection and the client iD if relevant. + connection = connectionFactory.createConnection(); + if (clientId != null && clientId.trim().length() > 0) { + connection.setClientID(clientId.trim()); + } + + connection.setExceptionListener(new IgniteJmsExceptionListener()); + + // build the JMS objects + if (destinationType == Queue.class) { + initializeJmsObjectsForQueue(); + } + else { + initializeJmsObjectsForTopic(); + } + + stopped = false; + + // start the JMS connection + connection.start(); + + // set up the scheduler service for committing batches + if (batched && batchClosureMillis > 0) { + scheduler = Executors.newScheduledThreadPool(1); + scheduler.schedule(new Runnable() { + @Override public void run() { + for (Session session : sessions) { + try { + session.commit(); + if (log.isDebugEnabled()) { + log.debug("Committing session from time-based batch completion [session=" + + session + "]"); + } + } + catch (JMSException ignored) { + log.warning("Error while committing session: from batch time-based completion " + + "[session=" + session + "]"); + } + } + for (IgniteJmsMessageListener ml : listeners) { + ml.resetBatchCounter(); + } + } + }, batchClosureMillis, TimeUnit.MILLISECONDS); + } + + } + catch (Throwable t) { + throw new IgniteException("Exception while initializing JmsStreamer", t); + } + + } + + /** + * Stops streamer. + */ + public void stop() throws IgniteException { + if (stopped) + throw new IgniteException("Attempted to stop an already stopped JMS Streamer"); + + try { + stopped = true; + + if (scheduler != null && !scheduler.isShutdown()) { + scheduler.shutdown(); + scheduler = null; + } + + connection.stop(); + connection.close(); + + for (Session s : sessions) { + s.close(); + } + + sessions.clear(); + consumers.clear(); + listeners.clear(); + } + catch (Throwable t) { + throw new IgniteException("Exception while stopping JmsStreamer", t); + } + } + + /** + * Sets the JMS {@link ConnectionFactory}. + * + * @param connectionFactory JMS {@link ConnectionFactory} for this streamer to use. + */ + public void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + /** + * <i>Compulsory.</i> The {@link MessageTransformer} that converts an incoming JMS {@link Message} (or subclass) + * into one or multiple cache entries. + * + * @param transformer The implementation of the MessageTransformer to use. + */ + public void setTransformer(MessageTransformer<T, K, V> transformer) { + this.transformer = transformer; + } + + /** + * Sets the JMS {@link Destination} explicitly. Takes precedence over destinationName if both are set. + * + * @param destination JMS {@link Destination} if setting it explicitly. + */ + public void setDestination(Destination destination) { + this.destination = destination; + } + + /** + * Sets the name of the JMS destination to consume from. + * + * @param destinationName The name of the destination; will be passed on directly to the broker. + */ + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + /** + * Sets the type of the destination to create, when used in combination with {@link #setDestinationName(String)}. It + * can be an interface or the implementation class specific to the broker. + * + * @param destinationType The class representing the destination type. Suggested values: {@link Queue} or {@link + * Topic}. <i>Compulsory</i> if using {@link #destinationName}. + * @see Queue + * @see Topic + */ + public void setDestinationType(Class<? extends Destination> destinationType) { + this.destinationType = destinationType; + } + + /** + * Sets the number of threads to concurrently consume JMS messages. <p> When working with queues, we will start as + * many {@link javax.jms.Session} objects as indicated by this field, i.e. you will get native concurrency. <p> On + * the other hand, when consuming from a topic, for obvious reason we will only start 1 message consumer but we will + * distribute the processing of received messages to as many concurrent threads as indicated. + * + * @param threads Number of threads to use. Default: <tt>1</tt>. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets the client ID of the JMS {@link Connection}. + * + * @param clientId Client ID in case we're using durable subscribers. Default: none. + */ + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** + * A <tt>true</tt> value is only accepted in combination with topic consumption. + * + * @param durableSubscription Whether or not to use durable subscriptions. Default: <tt>false</tt>. + */ + public void setDurableSubscription(boolean durableSubscription) { + this.durableSubscription = durableSubscription; + } + + /** + * Instructs the streamer whether to use local JMS transactions or not. + * + * @param transacted Whether to consume or not in a transacted manner. Default: <tt>false</tt>. + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + /** + * Batch consumption leverages JMS Transactions to minimise round trips to the broker. <p> Rather than ACKing every + * single message received, they will be received in the context of a JMS transaction which will be committed once + * the indicated batch closure size or batch closure time has elapsed. <p> Warning: May lead to duplicate + * consumption. + * + * @param batched Whether to consume messages in batches. Value <tt>true</tt> implies <tt>transacted = true</tt>. + * Default: <tt>false</tt>. + * @see #setBatchClosureMillis(long) + * @see #setBatchClosureSize(int) + */ + public void setBatched(boolean batched) { + this.batched = batched; + } + + /** + * When using batched consumption, sets the amount of messages that will be received before a batch is committed. + * + * @param batchClosureSize The amount of messages processed before a batch is committed. Default: <tt>50</tt>. + */ + public void setBatchClosureSize(int batchClosureSize) { + this.batchClosureSize = batchClosureSize; + } + + /** + * When using batched consumption, sets the time in milliseconds that will elapse before a batch is committed. + * + * @param batchClosureMillis Milliseconds before a batch is committed. Default: <tt>1000ms</tt>. + */ + public void setBatchClosureMillis(long batchClosureMillis) { + this.batchClosureMillis = batchClosureMillis; + } + + /** + * When using Durable Subscribers, sets the name of the durable subscriber. It is compulsory. + * + * @param durableSubscriptionName Name of the durable subscriber. Default: none. + */ + public void setDurableSubscriptionName(String durableSubscriptionName) { + this.durableSubscriptionName = durableSubscriptionName; + } + + /** + * Exception listener for queue/topic failures. + * + * @param exceptionListener ExceptionListener interface implementation. + */ + public void setExceptionListener(ExceptionListener exceptionListener) { + this.exceptionListener = exceptionListener; + } + + private void initializeJmsObjectsForTopic() throws JMSException { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Topic topic = (Topic)destination; + + if (destination == null) + topic = session.createTopic(destinationName); + + MessageConsumer consumer = durableSubscription ? session.createDurableSubscriber(topic, durableSubscriptionName) : + session.createConsumer(topic); + + IgniteJmsMessageListener messageListener = new IgniteJmsMessageListener(session, true); + consumer.setMessageListener(messageListener); + + consumers.add(consumer); + sessions.add(session); + listeners.add(messageListener); + } + + private void initializeJmsObjectsForQueue() throws JMSException { + for (int i = 0; i < threads; i++) { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + if (destination == null) + destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + + IgniteJmsMessageListener messageListener = new IgniteJmsMessageListener(session, false); + consumer.setMessageListener(messageListener); + + consumers.add(consumer); + sessions.add(session); + listeners.add(messageListener); + } + } + + private void processMessage(T message) { + final IgniteDataStreamer<K, V> streamer = getStreamer(); + + Map<K, V> entries = transformer.apply(message); + + if (entries == null || entries.size() == 0) + return; + + streamer.addData(entries); + } + + /** + * Message listener for queues. + */ + private class IgniteJmsMessageListener implements MessageListener { + + private Session session; + + private AtomicInteger counter = new AtomicInteger(0); + + private Executor executor; + + public IgniteJmsMessageListener(Session session, boolean createThreadPool) { + this.session = session; + + // if we don't need a thread pool, create a dummy one that executes the task synchronously + //noinspection NullableProblems + this.executor = createThreadPool ? Executors.newFixedThreadPool(threads) : new Executor() { + @Override public void execute(Runnable command) { + command.run(); + } + }; + } + + @Override public void onMessage(final Message message) { + if (stopped) { + return; + } + + executor.execute(new Runnable() { + @Override @SuppressWarnings("unchecked") + public void run() { + processMessage((T)message); + if (batched) { + // batch completion may be handled by timer only + if (batchClosureSize <= 0) + return; + + else if (counter.incrementAndGet() >= batchClosureSize) { + try { + session.commit(); + counter.set(0); + } + catch (Exception e) { + log.warning("Could not commit JMS session upon completion of batch.", e); + } + } + } + else if (transacted) { + try { + session.commit(); + } + catch (JMSException e) { + log.warning("Could not commit JMS session (non-batched).", e); + } + } + } + }); + + } + + public void resetBatchCounter() { + counter.set(0); + } + } + + /** + * Exception listener for JmsExceptions. + */ + private class IgniteJmsExceptionListener implements ExceptionListener { + /** {@inheritDoc} */ + @Override public void onException(JMSException e) { + U.error(log, "Caught JMS internal exception.", e); + + if (exceptionListener != null) + exceptionListener.onException(e); + } + } +} diff --git a/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java b/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java new file mode 100644 index 0000000..568040b --- /dev/null +++ b/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import java.util.Map; +import javax.jms.Message; +import org.jetbrains.annotations.Nullable; + +/** + * Implement this interface to transform from a {@link Message} to a set of cache entries in the form of a {@link Map}. + * + * @param <T> The type of JMS Message. + * @param <K> The type of the cache key. + * @param <V> The type of the cache value. + * @author Raul Kripalani + */ +public interface MessageTransformer<T extends Message, K, V> { + /** + * Transformation function. + * + * @param message The message received from the JMS broker. + * @return Set of cache entries to add to the cache. It could be empty or null if the message should be skipped. + */ + @Nullable Map<K, V> apply(T message); + +} diff --git a/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/package-info.java b/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/package-info.java new file mode 100644 index 0000000..52aa97b --- /dev/null +++ b/modules/jms11-ext/src/main/java/org/apache/ignite/stream/jms11/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of JMS queue and topic messages consumer. + */ + +package org.apache.ignite.stream.jms11; diff --git a/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java new file mode 100644 index 0000000..ff33d09 --- /dev/null +++ b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java @@ -0,0 +1,708 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + +/** + * Test for {@link JmsStreamer}. Tests both queues and topics. + * + * @author Raul Kripalani + */ +public class IgniteJmsStreamerTest extends GridCommonAbstractTest { + /** */ + private static final int CACHE_ENTRY_COUNT = 100; + + /** */ + private static final String QUEUE_NAME = "ignite.test.queue"; + + /** */ + private static final String TOPIC_NAME = "ignite.test.topic"; + + /** */ + private static final Map<String, String> TEST_DATA = new HashMap<>(); + + static { + for (int i = 1; i <= CACHE_ENTRY_COUNT; i++) + TEST_DATA.put(Integer.toString(i), "v" + i); + } + + /** */ + private BrokerService broker; + + /** */ + private ConnectionFactory connFactory; + + /** Constructor. */ + public IgniteJmsStreamerTest() { + super(true); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + @Override public void beforeTest() throws Exception { + grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setPersistenceAdapter(null); + broker.setPersistenceFactory(null); + + PolicyMap plcMap = new PolicyMap(); + PolicyEntry plc = new PolicyEntry(); + + plc.setQueuePrefetch(1); + + broker.setDestinationPolicy(plcMap); + broker.getDestinationPolicy().setDefaultEntry(plc); + broker.setSchedulerSupport(false); + + broker.start(true); + + connFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + } + + /** + * @throws Exception Iff ailed. + */ + @Override public void afterTest() throws Exception { + grid().cache(DEFAULT_CACHE_NAME).clear(); + + broker.stop(); + broker.deleteAllMessages(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testQueueFromName() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce messages into the queue + produceObjectMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestinationType(Queue.class); + jmsStreamer.setDestinationName(QUEUE_NAME); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testTopicFromName() throws JMSException, InterruptedException { + Destination dest = new ActiveMQTopic(TOPIC_NAME); + + // should not produced messages until subscribed to the topic; otherwise they will be missed because this is not + // a durable subscriber (for which a dedicated test exists) + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestinationType(Topic.class); + jmsStreamer.setDestinationName(TOPIC_NAME); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // produce messages + produceObjectMessages(dest, false); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testQueueFromExplicitDestination() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce messages into the queue + produceObjectMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + // start the streamer + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testTopicFromExplicitDestination() throws JMSException, InterruptedException { + Destination dest = new ActiveMQTopic(TOPIC_NAME); + + // should not produced messages until subscribed to the topic; otherwise they will be missed because this is not + // a durable subscriber (for which a dedicated test exists) + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // produce messages + produceObjectMessages(dest, false); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testInsertMultipleCacheEntriesFromOneMessage() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce A SINGLE MESSAGE, containing all data, into the queue + produceStringMessages(dest, true); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDurableSubscriberStartStopStart() throws Exception { + Destination dest = new ActiveMQTopic(TOPIC_NAME); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + jmsStreamer.setDurableSubscription(true); + jmsStreamer.setClientId(Long.toString(System.currentTimeMillis())); + jmsStreamer.setDurableSubscriptionName("ignite-test-durable"); + + // we start the streamer so that the durable subscriber registers itself + jmsStreamer.start(); + + // we stop it immediately + jmsStreamer.stop(); + + // we assert that there are no clients of the broker (to make sure we disconnected properly) + assertEquals(0, broker.getCurrentConnections()); + + // we send messages while we're still away + produceStringMessages(dest, false); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testQueueMessagesConsumedInBatchesCompletionSizeBased() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + jmsStreamer.setBatched(true); + jmsStreamer.setBatchClosureSize(99); + + // disable time-based session commits + jmsStreamer.setBatchClosureMillis(0); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + // we expect all entries to be loaded, but still one (uncommitted) message should remain in the queue + // as observed by the broker + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics(); + assertEquals(1, qStats.getMessages().getCount()); + assertEquals(1, qStats.getInflight().getCount()); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testQueueMessagesConsumedInBatchesCompletionTimeBased() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + jmsStreamer.setBatched(true); + jmsStreamer.setBatchClosureMillis(2000); + // disable size-based session commits + jmsStreamer.setBatchClosureSize(0); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics(); + + jmsStreamer.start(); + + // all messages are still inflight + assertEquals(CACHE_ENTRY_COUNT, qStats.getMessages().getCount()); + assertEquals(0, qStats.getDequeues().getCount()); + + // wait a little bit + Thread.sleep(100); + + // all messages are still inflight + assertEquals(CACHE_ENTRY_COUNT, qStats.getMessages().getCount()); + assertEquals(0, qStats.getDequeues().getCount()); + + // now let the scheduler execute + Thread.sleep(2100); + + // all messages are committed + assertEquals(0, qStats.getMessages().getCount()); + assertEquals(CACHE_ENTRY_COUNT, qStats.getDequeues().getCount()); + + latch.await(5, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testGenerateNoEntries() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + // override the transformer with one that generates no cache entries + jmsStreamer.setTransformer(TestTransformers.generateNoEntries()); + jmsStreamer.setDestination(dest); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(1); + + jmsStreamer.start(); + + // no cache PUT events were received in 3 seconds, i.e. CountDownLatch does not fire + assertFalse(latch.await(3, TimeUnit.SECONDS)); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testTransactedSessionNoBatching() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setTransacted(true); + jmsStreamer.setDestination(dest); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * @throws Exception If failed. + */ + @Test + public void testQueueMultipleThreads() throws Exception { + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + // produce messages into the queue + produceObjectMessages(dest, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestination(dest); + jmsStreamer.setThreads(5); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + // start the streamer + jmsStreamer.start(); + + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics(); + assertEquals(5, qStats.getConsumers().getCount()); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + // assert that all consumers received messages - given that the prefetch is 1 + for (Subscription subscription : broker.getBroker().getDestinationMap().get(dest).getConsumers()) + assertTrue(subscription.getDequeueCounter() > 0); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + /** + * Test for ExceptionListener functionality. + * + * @throws Exception If fails. + */ + @Test + public void testExceptionListener() throws Exception { + // restart broker with auth plugin + if (broker.isStarted()) + broker.stop(); + + broker.waitUntilStopped(); + + broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(new ArrayList())}); + + broker.start(true); + + connFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + + final List<Throwable> lsnrExceptions = new LinkedList<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + Destination dest = new ActiveMQQueue(QUEUE_NAME); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + + jmsStreamer.setExceptionListener(new ExceptionListener() { + @Override public void onException(JMSException e) { + System.out.println("ERROR"); + + lsnrExceptions.add(e); + + latch.countDown(); + } + }); + + jmsStreamer.setDestination(dest); + + GridTestUtils.assertThrowsWithCause(new Callable<Void>() { + @Override public Void call() throws Exception { + jmsStreamer.start(); + + return null; + } + }, SecurityException.class); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertTrue(lsnrExceptions.size() > 0); + + GridTestUtils.assertThrowsWithCause(new Callable<Void>() { + @Override public Void call() throws Exception { + jmsStreamer.stop(); + + return null; + } + }, IgniteException.class); + } + } + + /** + * + */ + private void assertAllCacheEntriesLoaded() { + // Get the cache and check that the entries are present + IgniteCache<String, String> cache = grid().cache(DEFAULT_CACHE_NAME); + for (Map.Entry<String, String> entry : TEST_DATA.entrySet()) + assertEquals(entry.getValue(), cache.get(entry.getKey())); + } + + @SuppressWarnings("unchecked") + private <T extends Message> JmsStreamer<T, String, String> newJmsStreamer(Class<T> type, + IgniteDataStreamer<String, String> dataStreamer) { + + JmsStreamer<T, String, String> jmsStreamer = new JmsStreamer<>(); + jmsStreamer.setIgnite(grid()); + jmsStreamer.setStreamer(dataStreamer); + jmsStreamer.setConnectionFactory(connFactory); + + if (type == ObjectMessage.class) + jmsStreamer.setTransformer((MessageTransformer<T, String, String>) TestTransformers.forObjectMessage()); + else + jmsStreamer.setTransformer((MessageTransformer<T, String, String>) TestTransformers.forTextMessage()); + + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(10); + return jmsStreamer; + } + + /** + * @param expect Expected events number. + * @return Event receive latch. + */ + private CountDownLatch subscribeToPutEvents(int expect) { + Ignite ignite = grid(); + + // Listen to cache PUT events and expect as many as messages as test data items + final CountDownLatch latch = new CountDownLatch(expect); + + @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> cb = new IgniteBiPredicate<UUID, CacheEvent>() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(cb, null, EVT_CACHE_OBJECT_PUT); + return latch; + } + + private void produceObjectMessages(Destination dest, boolean singleMsg) throws JMSException { + Session ses = connFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer mp = ses.createProducer(dest); + + HashSet<TestTransformers.TestObject> set = new HashSet<>(); + + for (String key : TEST_DATA.keySet()) { + TestTransformers.TestObject to = new TestTransformers.TestObject(key, TEST_DATA.get(key)); + set.add(to); + } + + int messagesSent; + + if (singleMsg) { + mp.send(ses.createObjectMessage(set)); + messagesSent = 1; + } + else { + for (TestTransformers.TestObject to : set) + mp.send(ses.createObjectMessage(to)); + + messagesSent = set.size(); + } + + if (dest instanceof Queue) { + try { + assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(dest) + .getDestinationStatistics().getMessages().getCount()); + } + catch (Exception e) { + fail(e.toString()); + } + } + + } + + private void produceStringMessages(Destination dest, boolean singleMsg) throws JMSException { + Session ses = connFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer mp = ses.createProducer(dest); + + HashSet<String> set = new HashSet<>(); + + for (String key : TEST_DATA.keySet()) + set.add(key + "," + TEST_DATA.get(key)); + + int messagesSent; + + if (singleMsg) { + StringBuilder sb = new StringBuilder(); + + for (String s : set) + sb.append(s).append("|"); + + sb.deleteCharAt(sb.length() - 1); + mp.send(ses.createTextMessage(sb.toString())); + messagesSent = 1; + + } + else { + for (String s : set) + mp.send(ses.createTextMessage(s)); + + messagesSent = set.size(); + } + + if (dest instanceof Queue) { + try { + assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(dest) + .getDestinationStatistics().getMessages().getCount()); + } + catch (Exception e) { + fail(e.toString()); + } + } + + } +} diff --git a/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java new file mode 100644 index 0000000..303b6b2 --- /dev/null +++ b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * JMS streamer tests. + * + * @author Raul Kripalani + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + IgniteJmsStreamerTest.class +}) +public class IgniteJmsStreamerTestSuite { + // No-op. +} diff --git a/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java new file mode 100644 index 0000000..16b312a --- /dev/null +++ b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; + +/** + * Test transformers for JmsStreamer tests. + * + * @author Raul Kripalani + */ +public class TestTransformers { + + /** + * Returns a transformer for JMS {@link TextMessage}s, capable of extracting many tuples from a single message, + * if pipe characters are encountered. + * + * @return + */ + public static MessageTransformer<TextMessage, String, String> forTextMessage() { + return new MessageTransformer<TextMessage, String, String>() { + @Override public Map<String, String> apply(TextMessage message) { + final Map<String, String> answer = new HashMap<>(); + String text; + try { + text = message.getText(); + } + catch (JMSException e) { + e.printStackTrace(); + return Collections.emptyMap(); + } + for (String s : text.split("\\|")) { + String[] tokens = s.split(","); + answer.put(tokens[0], tokens[1]); + } + return answer; + } + }; + } + + /** + * Returns a transformer for JMS {@link ObjectMessage}s, capable of extracting many tuples from a single message, + * if the payload is a {@link Collection}. + * + * @return + */ + public static MessageTransformer<ObjectMessage, String, String> forObjectMessage() { + return new MessageTransformer<ObjectMessage, String, String>() { + @SuppressWarnings("unchecked") + @Override public Map<String, String> apply(ObjectMessage message) { + Object object; + try { + object = message.getObject(); + } + catch (JMSException e) { + e.printStackTrace(); + return Collections.emptyMap(); + } + + final Map<String, String> answer = new HashMap<>(); + if (object instanceof Collection) { + for (TestObject to : (Collection<TestObject>)object) + answer.put(to.getKey(), to.getValue()); + + } + else if (object instanceof TestObject) { + TestObject to = (TestObject)object; + answer.put(to.getKey(), to.getValue()); + } + return answer; + } + }; + } + + public static MessageTransformer<TextMessage, String, String> generateNoEntries() { + return new MessageTransformer<TextMessage, String, String>() { + @Override public Map<String, String> apply(TextMessage message) { + return null; + } + }; + } + + public static class TestObject implements Serializable { + private static final long serialVersionUID = -7332027566186690945L; + + private String key; + + private String value; + + public TestObject(String key, String value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + } + +} diff --git a/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/package-info.java b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/package-info.java new file mode 100644 index 0000000..52aa97b --- /dev/null +++ b/modules/jms11-ext/src/test/java/org/apache/ignite/stream/jms11/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of JMS queue and topic messages consumer. + */ + +package org.apache.ignite.stream.jms11; diff --git a/pom.xml b/pom.xml index 9d41302..2aa8af9 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ <module>modules/mqtt-ext</module> <module>modules/storm-ext</module> <module>modules/camel-ext</module> + <module>modules/jms11-ext</module> </modules> <profiles>