storm git commit: fix version in storm-jms pom
Repository: storm Updated Branches: refs/heads/1.0.x-branch c1e59d412 -> 3cf2410eb fix version in storm-jms pom Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3cf2410e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3cf2410e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3cf2410e Branch: refs/heads/1.0.x-branch Commit: 3cf2410ebae0ceaaa0edaf70685cb431fd17581e Parents: c1e59d4 Author: P. Taylor GoetzAuthored: Thu Oct 6 14:52:12 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Oct 6 14:52:12 2016 -0400 -- external/storm-jms/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3cf2410e/external/storm-jms/pom.xml -- diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml index 7364394..029f914 100644 --- a/external/storm-jms/pom.xml +++ b/external/storm-jms/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm -1.1.0-SNAPSHOT +1.0.3-SNAPSHOT ../../pom.xml
[1/2] storm git commit: merge storm-jms into 1.0.x-branch
Repository: storm Updated Branches: refs/heads/1.0.x-branch 73b00e5d5 -> c1e59d412 http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java -- diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java new file mode 100644 index 000..ea571fc --- /dev/null +++ b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.storm.jms.JmsTupleProducer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class MockTupleProducer implements JmsTupleProducer { +private static final long serialVersionUID = 1L; + +@Override +public Values toTuple(Message msg) throws JMSException { +if (msg instanceof TextMessage) { +String json = ((TextMessage) msg).getText(); +return new Values(json); +} else { +return null; +} +} + +@Override +public void declareOutputFields(OutputFieldsDeclarer declarer) { +declarer.declare(new Fields("json")); +} + +} http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/external/storm-jms/core/src/test/resources/jndi.properties -- diff --git a/external/storm-jms/core/src/test/resources/jndi.properties b/external/storm-jms/core/src/test/resources/jndi.properties new file mode 100644 index 000..af19521 --- /dev/null +++ b/external/storm-jms/core/src/test/resources/jndi.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory +java.naming.provider.url = vm://localhost?broker.persistent=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/external/storm-jms/examples/README.markdown -- diff --git a/external/storm-jms/examples/README.markdown b/external/storm-jms/examples/README.markdown new file mode 100644 index 000..7a4d8f0 --- /dev/null +++ b/external/storm-jms/examples/README.markdown @@ -0,0 +1,12 @@ +## About Storm JMS Examples +This project contains a simple storm topology that illustrates the usage of "storm-jms". + +To build: + +`mvn clean install` + +The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory: + +`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar` + + http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/external/storm-jms/examples/pom.xml -- diff --git a/external/storm-jms/examples/pom.xml b/external/storm-jms/examples/pom.xml new file mode 100644 index 000..cb78f02 --- /dev/null +++ b/external/storm-jms/examples/pom.xml @@ -0,0 +1,151 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +
[2/2] storm git commit: merge storm-jms into 1.0.x-branch
merge storm-jms into 1.0.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1e59d41 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1e59d41 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1e59d41 Branch: refs/heads/1.0.x-branch Commit: c1e59d41237180f93f7499946a464a3855187b48 Parents: 73b00e5 Author: P. Taylor GoetzAuthored: Thu Oct 6 13:19:06 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Oct 6 14:49:41 2016 -0400 -- docs/images/Storm-JMS-Example.png | Bin 0 -> 40432 bytes docs/index.md | 1 + docs/storm-jms-example.md | 111 + docs/storm-jms-spring.md| 25 ++ docs/storm-jms.md | 33 ++ external/storm-jms/README.markdown | 25 ++ external/storm-jms/core/pom.xml | 95 + .../apache/storm/jms/JmsMessageProducer.java| 46 +++ .../java/org/apache/storm/jms/JmsProvider.java | 48 +++ .../org/apache/storm/jms/JmsTupleProducer.java | 58 +++ .../java/org/apache/storm/jms/bolt/JmsBolt.java | 217 ++ .../apache/storm/jms/spout/JmsMessageID.java| 58 +++ .../org/apache/storm/jms/spout/JmsSpout.java| 382 + .../org/apache/storm/jms/trident/JmsBatch.java | 27 ++ .../org/apache/storm/jms/trident/JmsState.java | 129 ++ .../storm/jms/trident/JmsStateFactory.java | 40 ++ .../apache/storm/jms/trident/JmsUpdater.java| 38 ++ .../storm/jms/trident/TridentJmsSpout.java | 409 +++ .../apache/storm/jms/spout/JmsSpoutTest.java| 88 .../apache/storm/jms/spout/MockJmsProvider.java | 62 +++ .../jms/spout/MockSpoutOutputCollector.java | 55 +++ .../storm/jms/spout/MockTupleProducer.java | 47 +++ .../core/src/test/resources/jndi.properties | 18 + external/storm-jms/examples/README.markdown | 12 + external/storm-jms/examples/pom.xml | 151 +++ .../storm/jms/example/ExampleJmsTopology.java | 131 ++ .../apache/storm/jms/example/GenericBolt.java | 116 ++ .../storm/jms/example/JsonTupleProducer.java| 58 +++ .../storm/jms/example/SpringJmsProvider.java| 74 .../src/main/resources/jms-activemq.xml | 53 +++ .../src/main/resources/log4j.properties | 29 ++ external/storm-jms/pom.xml | 70 pom.xml | 5 +- storm-dist/binary/src/main/assembly/binary.xml | 22 +- 34 files changed, 2731 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/docs/images/Storm-JMS-Example.png -- diff --git a/docs/images/Storm-JMS-Example.png b/docs/images/Storm-JMS-Example.png new file mode 100644 index 000..80e3493 Binary files /dev/null and b/docs/images/Storm-JMS-Example.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 4601c31..00b08a1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -86,6 +86,7 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [Apache Solr Integration](storm-solr.html) * [Apache Cassandra Integration](storm-cassandra.html) * [JDBC Integration](storm-jdbc.html) +* [JMS Integration](storm-jms.html) * [Redis Integration](storm-redis.html) * [Event Hubs Intergration](storm-eventhubs.html) * [Elasticsearch Integration](storm-elasticsearch.html) http://git-wip-us.apache.org/repos/asf/storm/blob/c1e59d41/docs/storm-jms-example.md -- diff --git a/docs/storm-jms-example.md b/docs/storm-jms-example.md new file mode 100644 index 000..29cadf1 --- /dev/null +++ b/docs/storm-jms-example.md @@ -0,0 +1,111 @@ +--- +title: Storm JMS Integration +layout: documentation +documentation: true +--- +## Example Storm JMS Topology + +The storm-jms source code contains an example project (in the "examples" directory) +builds a multi-bolt/multi-spout topology (depicted below) that uses the JMS Spout and JMS Bolt components. + +![picture alt](images/Storm-JMS-Example.png "Example JMS Topology") + +The green components represent instances of the storm-jms components. White components represent +"standard" Storm bolts (in the example these bolts are instances of `GenericBolt` which simply logs +information about the tuples it receives and emits). + +Grey arrows represent JMS messages, while black arrows represent the flow of Storm tuple objects. + +### JMS Transactions and Gauranteed Processing
[2/2] storm git commit: merge storm-jms into 1.x-branch
merge storm-jms into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4da2e5bc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4da2e5bc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4da2e5bc Branch: refs/heads/1.x-branch Commit: 4da2e5bc3abe8a6be72dbd770cbfac0876818268 Parents: 311da95 Author: P. Taylor GoetzAuthored: Thu Oct 6 13:19:06 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Oct 6 13:19:06 2016 -0400 -- docs/images/Storm-JMS-Example.png | Bin 0 -> 40432 bytes docs/index.md | 1 + docs/storm-jms-example.md | 111 + docs/storm-jms-spring.md| 25 ++ docs/storm-jms.md | 33 ++ external/storm-jms/README.markdown | 25 ++ external/storm-jms/core/pom.xml | 95 + .../apache/storm/jms/JmsMessageProducer.java| 46 +++ .../java/org/apache/storm/jms/JmsProvider.java | 48 +++ .../org/apache/storm/jms/JmsTupleProducer.java | 58 +++ .../java/org/apache/storm/jms/bolt/JmsBolt.java | 217 ++ .../apache/storm/jms/spout/JmsMessageID.java| 58 +++ .../org/apache/storm/jms/spout/JmsSpout.java| 382 + .../org/apache/storm/jms/trident/JmsBatch.java | 27 ++ .../org/apache/storm/jms/trident/JmsState.java | 129 ++ .../storm/jms/trident/JmsStateFactory.java | 40 ++ .../apache/storm/jms/trident/JmsUpdater.java| 38 ++ .../storm/jms/trident/TridentJmsSpout.java | 409 +++ .../apache/storm/jms/spout/JmsSpoutTest.java| 88 .../apache/storm/jms/spout/MockJmsProvider.java | 62 +++ .../jms/spout/MockSpoutOutputCollector.java | 55 +++ .../storm/jms/spout/MockTupleProducer.java | 47 +++ .../core/src/test/resources/jndi.properties | 18 + external/storm-jms/examples/README.markdown | 12 + external/storm-jms/examples/pom.xml | 151 +++ .../storm/jms/example/ExampleJmsTopology.java | 131 ++ .../apache/storm/jms/example/GenericBolt.java | 116 ++ .../storm/jms/example/JsonTupleProducer.java| 58 +++ .../storm/jms/example/SpringJmsProvider.java| 74 .../src/main/resources/jms-activemq.xml | 53 +++ .../src/main/resources/log4j.properties | 29 ++ external/storm-jms/pom.xml | 70 pom.xml | 1 + storm-dist/binary/src/main/assembly/binary.xml | 21 + 34 files changed, 2728 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/docs/images/Storm-JMS-Example.png -- diff --git a/docs/images/Storm-JMS-Example.png b/docs/images/Storm-JMS-Example.png new file mode 100644 index 000..80e3493 Binary files /dev/null and b/docs/images/Storm-JMS-Example.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 4601c31..00b08a1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -86,6 +86,7 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [Apache Solr Integration](storm-solr.html) * [Apache Cassandra Integration](storm-cassandra.html) * [JDBC Integration](storm-jdbc.html) +* [JMS Integration](storm-jms.html) * [Redis Integration](storm-redis.html) * [Event Hubs Intergration](storm-eventhubs.html) * [Elasticsearch Integration](storm-elasticsearch.html) http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/docs/storm-jms-example.md -- diff --git a/docs/storm-jms-example.md b/docs/storm-jms-example.md new file mode 100644 index 000..29cadf1 --- /dev/null +++ b/docs/storm-jms-example.md @@ -0,0 +1,111 @@ +--- +title: Storm JMS Integration +layout: documentation +documentation: true +--- +## Example Storm JMS Topology + +The storm-jms source code contains an example project (in the "examples" directory) +builds a multi-bolt/multi-spout topology (depicted below) that uses the JMS Spout and JMS Bolt components. + +![picture alt](images/Storm-JMS-Example.png "Example JMS Topology") + +The green components represent instances of the storm-jms components. White components represent +"standard" Storm bolts (in the example these bolts are instances of `GenericBolt` which simply logs +information about the tuples it receives and emits). + +Grey arrows represent JMS messages, while black arrows represent the flow of Storm tuple objects. + +### JMS Transactions and Gauranteed Processing +The example is set up
[1/2] storm git commit: merge storm-jms into 1.x-branch
Repository: storm Updated Branches: refs/heads/1.x-branch 311da9578 -> 4da2e5bc3 http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java -- diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java new file mode 100644 index 000..ea571fc --- /dev/null +++ b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.storm.jms.JmsTupleProducer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class MockTupleProducer implements JmsTupleProducer { +private static final long serialVersionUID = 1L; + +@Override +public Values toTuple(Message msg) throws JMSException { +if (msg instanceof TextMessage) { +String json = ((TextMessage) msg).getText(); +return new Values(json); +} else { +return null; +} +} + +@Override +public void declareOutputFields(OutputFieldsDeclarer declarer) { +declarer.declare(new Fields("json")); +} + +} http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/external/storm-jms/core/src/test/resources/jndi.properties -- diff --git a/external/storm-jms/core/src/test/resources/jndi.properties b/external/storm-jms/core/src/test/resources/jndi.properties new file mode 100644 index 000..af19521 --- /dev/null +++ b/external/storm-jms/core/src/test/resources/jndi.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory +java.naming.provider.url = vm://localhost?broker.persistent=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/external/storm-jms/examples/README.markdown -- diff --git a/external/storm-jms/examples/README.markdown b/external/storm-jms/examples/README.markdown new file mode 100644 index 000..7a4d8f0 --- /dev/null +++ b/external/storm-jms/examples/README.markdown @@ -0,0 +1,12 @@ +## About Storm JMS Examples +This project contains a simple storm topology that illustrates the usage of "storm-jms". + +To build: + +`mvn clean install` + +The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory: + +`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar` + + http://git-wip-us.apache.org/repos/asf/storm/blob/4da2e5bc/external/storm-jms/examples/pom.xml -- diff --git a/external/storm-jms/examples/pom.xml b/external/storm-jms/examples/pom.xml new file mode 100644 index 000..cb78f02 --- /dev/null +++ b/external/storm-jms/examples/pom.xml @@ -0,0 +1,151 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +
[22/50] [abbrv] storm git commit: [maven-release-plugin] prepare for next development iteration
[maven-release-plugin] prepare for next development iteration Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6b3cb99 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6b3cb99 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6b3cb99 Branch: refs/heads/master Commit: f6b3cb99d872daeadc80da31bb5d8ba4495c2e8c Parents: 0e46a53 Author: P. Taylor GoetzAuthored: Wed Feb 19 17:31:36 2014 -0500 Committer: P. Taylor Goetz Committed: Wed Feb 19 17:31:36 2014 -0500 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/f6b3cb99/pom.xml -- diff --git a/pom.xml b/pom.xml index 1647e53..af6bc54 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.9.0 + 0.9.1-SNAPSHOT Storm JMS Storm JMS Components
[33/50] [abbrv] storm git commit: remove unnecessary TridentJmsMessageProducer; Update README
remove unnecessary TridentJmsMessageProducer; Update README Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d2319ca Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d2319ca Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d2319ca Branch: refs/heads/master Commit: 0d2319ca892ff90c3cb6a3248e4ef3b482a53100 Parents: a12f643 Author: P. Taylor GoetzAuthored: Thu Aug 25 16:13:10 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Aug 25 16:13:10 2016 -0400 -- README.markdown | 9 +++- .../apache/storm/jms/JmsMessageProducer.java| 4 ++-- .../org/apache/storm/jms/trident/JmsState.java | 5 +++-- .../jms/trident/TridentJmsMessageProducer.java | 22 4 files changed, 8 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/0d2319ca/README.markdown -- diff --git a/README.markdown b/README.markdown index a0e8500..05fb6ea 100644 --- a/README.markdown +++ b/README.markdown @@ -1,7 +1,7 @@ ## About Storm JMS Storm JMS is a generic framework for integrating JMS messaging within the Storm framework. -The [Storm Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) explains what storm is and why it was built. +The [Storm Rationale page](https://storm.apache.org/releases/1.0.2/Rationale.html) explains what storm is and why it was built. Storm-JMS allows you to inject data into Storm via a generic JMS spout, as well as consume data from Storm via a generic JMS bolt. @@ -21,11 +21,8 @@ The JMS Bolt component allows for data within a Storm topology to be published t A JMS Bolt connects to a JMS Destination, and publishes JMS Messages based on the Storm "Tuple" objects it receives. ## Project Location -Primary development of storm-cassandra will take place at: -https://github.com/ptgoetz/storm-cassandra - -Point/stable (non-SNAPSHOT) release souce code will be pushed to: -https://github.com/nathanmarz/storm-contrib +Primary development of storm-jms will take place at: +https://github.com/ptgoetz/storm-jms Maven artifacts for releases will be available on maven central. http://git-wip-us.apache.org/repos/asf/storm/blob/0d2319ca/src/main/java/org/apache/storm/jms/JmsMessageProducer.java -- diff --git a/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/src/main/java/org/apache/storm/jms/JmsMessageProducer.java index 74676af..f77b6e1 100644 --- a/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ b/src/main/java/org/apache/storm/jms/JmsMessageProducer.java @@ -6,7 +6,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; /** * JmsMessageProducer implementations are responsible for translating * a org.apache.storm.tuple.Values instance into a @@ -28,5 +28,5 @@ public interface JmsMessageProducer extends Serializable{ * @return * @throws JMSException */ - public Message toMessage(Session session, Tuple input) throws JMSException; + public Message toMessage(Session session, ITuple input) throws JMSException; } http://git-wip-us.apache.org/repos/asf/storm/blob/0d2319ca/src/main/java/org/apache/storm/jms/trident/JmsState.java -- diff --git a/src/main/java/org/apache/storm/jms/trident/JmsState.java b/src/main/java/org/apache/storm/jms/trident/JmsState.java index 3159cfa..bfb78b5 100644 --- a/src/main/java/org/apache/storm/jms/trident/JmsState.java +++ b/src/main/java/org/apache/storm/jms/trident/JmsState.java @@ -17,6 +17,7 @@ */ package org.apache.storm.jms.trident; +import org.apache.storm.jms.JmsMessageProducer; import org.apache.storm.jms.JmsProvider; import org.apache.storm.topology.FailedException; import org.slf4j.Logger; @@ -45,7 +46,7 @@ public class JmsState implements State { public static class Options implements Serializable { private JmsProvider jmsProvider; -private TridentJmsMessageProducer msgProducer; +private JmsMessageProducer msgProducer; private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; private boolean jmsTransactional = true; @@ -54,7 +55,7 @@ public class JmsState implements State { return this; } -public Options withMessageProducer(TridentJmsMessageProducer msgProducer) { +public Options withMessageProducer(JmsMessageProducer msgProducer) { this.msgProducer = msgProducer; return this;
[25/50] [abbrv] storm git commit: Jms trident state that writes messages to jms as part of trident topology.
Jms trident state that writes messages to jms as part of trident topology. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a9e7339b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a9e7339b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a9e7339b Branch: refs/heads/master Commit: a9e7339b7450f036ee60020d61d52f213b8ed00f Parents: 183f74f Author: Parth BrahmbhattAuthored: Wed Aug 6 09:37:52 2014 -0700 Committer: Parth Brahmbhatt Committed: Wed Aug 6 09:37:52 2014 -0700 -- .../contrib/jms/TridentJmsMessageProducer.java | 23 .../storm/contrib/jms/trident/JmsState.java | 129 +++ .../contrib/jms/trident/JmsStateFactory.java| 40 ++ .../storm/contrib/jms/trident/JmsUpdater.java | 38 ++ 4 files changed, 230 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a9e7339b/src/main/java/backtype/storm/contrib/jms/TridentJmsMessageProducer.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/TridentJmsMessageProducer.java b/src/main/java/backtype/storm/contrib/jms/TridentJmsMessageProducer.java new file mode 100644 index 000..3f2ddf2 --- /dev/null +++ b/src/main/java/backtype/storm/contrib/jms/TridentJmsMessageProducer.java @@ -0,0 +1,23 @@ +package backtype.storm.contrib.jms; + +import backtype.storm.tuple.Tuple; +import storm.trident.tuple.TridentTuple; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.io.Serializable; + +public interface TridentJmsMessageProducer extends Serializable{ + + /** +* Translate a backtype.storm.tuple.TridentTuple object +* to a javax.jms.Messagehttp://git-wip-us.apache.org/repos/asf/storm/blob/a9e7339b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java new file mode 100644 index 000..5f0bc58 --- /dev/null +++ b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.contrib.jms.trident; + +import backtype.storm.contrib.jms.JmsProvider; +import backtype.storm.contrib.jms.TridentJmsMessageProducer; +import backtype.storm.topology.FailedException; +import backtype.storm.tuple.Values; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.trident.operation.TridentCollector; +import storm.trident.state.State; +import storm.trident.tuple.TridentTuple; + +import javax.jms.*; +import java.io.Serializable; +import java.lang.IllegalStateException; +import java.util.List; +import java.util.Map; + +public class JmsState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(JmsState.class); + +private Options options; +private Connection connection; +private Session session; +private MessageProducer messageProducer; + +protected JmsState(Options options) { +this.options = options; +} + +public static class Options implements Serializable { +private JmsProvider jmsProvider; +private TridentJmsMessageProducer msgProducer; +private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; +private boolean jmsTransactional = true; + +public Options withJmsProvider(JmsProvider provider) { +this.jmsProvider = provider; +return this; +} + +public Options withMessageProducer(TridentJmsMessageProducer msgProducer) { +this.msgProducer = msgProducer; +return this; +} + +public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) { +this.jmsAcknowledgeMode = jmsAcknowledgeMode; +
[49/50] [abbrv] storm git commit: add storm-jms to binary distribution
add storm-jms to binary distribution Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/668d4ab6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/668d4ab6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/668d4ab6 Branch: refs/heads/master Commit: 668d4ab6a8d2c3a98674fefce44147be315e73d4 Parents: fd22d74 Author: P. Taylor GoetzAuthored: Wed Oct 5 16:48:27 2016 -0400 Committer: P. Taylor Goetz Committed: Wed Oct 5 16:48:27 2016 -0400 -- storm-dist/binary/src/main/assembly/binary.xml | 21 + 1 file changed, 21 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/668d4ab6/storm-dist/binary/src/main/assembly/binary.xml -- diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index 4d7693b..8020039 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -404,6 +404,27 @@ storm*jar + +${project.basedir}/../../external/storm-jms +external/storm-jms + +README.* + + + + ${project.basedir}/../../external/storm-jms/core/target +external/storm-jms + +storm*jar + + + + ${project.basedir}/../../external/storm-jms/examples/target +external/storm-jms + +storm*jar + +
[50/50] [abbrv] storm git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce7ba756 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce7ba756 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce7ba756 Branch: refs/heads/master Commit: ce7ba756903e129157b232787b5eb42195b2ce98 Parents: 668d4ab 06a7f54 Author: P. Taylor GoetzAuthored: Thu Oct 6 11:07:24 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Oct 6 11:07:24 2016 -0400 -- CHANGELOG.md| 1 + conf/defaults.yaml | 2 +- docs/Pacemaker.md | 17 ++- storm-core/src/jvm/org/apache/storm/Config.java | 6 +- .../storm/cluster/PaceMakerStateStorage.java| 68 +++ .../cluster/PaceMakerStateStorageFactory.java | 40 +-- .../netty/KerberosSaslClientHandler.java| 6 +- .../netty/KerberosSaslNettyClient.java | 4 +- .../apache/storm/pacemaker/PacemakerClient.java | 66 +-- .../storm/pacemaker/PacemakerClientHandler.java | 11 +- .../storm/pacemaker/PacemakerClientPool.java| 113 +++ .../pacemaker/PacemakerConnectionException.java | 24 .../pacemaker/codec/ThriftNettyClientCodec.java | 7 +- .../MockedPaceMakerStateStorageFactory.java | 32 -- .../storm/PaceMakerStateStorageFactoryTest.java | 41 +-- 15 files changed, 287 insertions(+), 151 deletions(-) --
[20/50] [abbrv] storm git commit: add Apache header to .properties files
add Apache header to .properties files Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b058e4b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b058e4b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b058e4b Branch: refs/heads/master Commit: 6b058e4b2695a05d4cb1df716c1eaccbdb65a3e9 Parents: a64d936 Author: P. Taylor GoetzAuthored: Mon Feb 17 22:14:33 2014 -0500 Committer: P. Taylor Goetz Committed: Mon Feb 17 22:14:33 2014 -0500 -- examples/src/main/resources/log4j.properties | 16 src/test/resources/jndi.properties | 16 2 files changed, 32 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6b058e4b/examples/src/main/resources/log4j.properties -- diff --git a/examples/src/main/resources/log4j.properties b/examples/src/main/resources/log4j.properties index 31a50d6..079b195 100644 --- a/examples/src/main/resources/log4j.properties +++ b/examples/src/main/resources/log4j.properties @@ -1,3 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/storm/blob/6b058e4b/src/test/resources/jndi.properties -- diff --git a/src/test/resources/jndi.properties b/src/test/resources/jndi.properties index 5631e35..af19521 100644 --- a/src/test/resources/jndi.properties +++ b/src/test/resources/jndi.properties @@ -1,2 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = vm://localhost?broker.persistent=false \ No newline at end of file
[32/50] [abbrv] storm git commit: update Storm version to 1.0.2
update Storm version to 1.0.2 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a12f6436 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a12f6436 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a12f6436 Branch: refs/heads/master Commit: a12f6436734b5fa4a90854b1462d78efef71d70a Parents: d152d72 Author: P. Taylor GoetzAuthored: Thu Aug 25 15:54:44 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Aug 25 15:54:44 2016 -0400 -- pom.xml | 10 ++-- .../apache/storm/jms/JmsMessageProducer.java| 7 +++--- .../org/apache/storm/jms/JmsTupleProducer.java | 4 ++-- .../java/org/apache/storm/jms/bolt/JmsBolt.java | 14 ++-- .../apache/storm/jms/spout/JmsMessageID.java| 12 -- .../org/apache/storm/jms/spout/JmsSpout.java| 14 ++-- .../org/apache/storm/jms/trident/JmsState.java | 8 +++ .../storm/jms/trident/JmsStateFactory.java | 6 ++--- .../apache/storm/jms/trident/JmsUpdater.java| 8 +++ .../jms/trident/TridentJmsMessageProducer.java | 3 +-- .../storm/jms/trident/TridentJmsSpout.java | 24 ++-- .../apache/storm/jms/spout/JmsSpoutTest.java| 9 +--- .../jms/spout/MockSpoutOutputCollector.java | 7 +- .../storm/jms/spout/MockTupleProducer.java | 6 ++--- 14 files changed, 60 insertions(+), 72 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a12f6436/pom.xml -- diff --git a/pom.xml b/pom.xml index aa87b75..aca9129 100644 --- a/pom.xml +++ b/pom.xml @@ -43,13 +43,7 @@ scm:git:g...@github.com:ptgoetz/storm-jms.git :g...@github.com:ptgoetz/storm-jms.git - - - - clojars.org - http://clojars.org/repo - - + @@ -61,7 +55,7 @@ - 0.9.3 + 1.0.2 http://git-wip-us.apache.org/repos/asf/storm/blob/a12f6436/src/main/java/org/apache/storm/jms/JmsMessageProducer.java -- diff --git a/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/src/main/java/org/apache/storm/jms/JmsMessageProducer.java index e9ad52d..74676af 100644 --- a/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ b/src/main/java/org/apache/storm/jms/JmsMessageProducer.java @@ -6,11 +6,10 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.tuple.Tuple; /** * JmsMessageProducer implementations are responsible for translating - * a backtype.storm.tuple.Values instance into a + * a org.apache.storm.tuple.Values instance into a * javax.jms.Message object. * * @@ -21,7 +20,7 @@ import backtype.storm.tuple.Values; public interface JmsMessageProducer extends Serializable{ /** -* Translate a backtype.storm.tuple.Tuple object +* Translate a org.apache.storm.tuple.Tuple object * to a javax.jms.Messagehttp://git-wip-us.apache.org/repos/asf/storm/blob/a12f6436/src/main/java/org/apache/storm/jms/JmsTupleProducer.java -- diff --git a/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java index 96b027d..8146db3 100644 --- a/src/main/java/org/apache/storm/jms/JmsTupleProducer.java +++ b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -5,8 +5,8 @@ import java.io.Serializable; import javax.jms.JMSException; import javax.jms.Message; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Values; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; /** * Interface to define classes that can produce a Storm Values objects http://git-wip-us.apache.org/repos/asf/storm/blob/a12f6436/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java -- diff --git a/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java index 896f932..50a4e01 100644 --- a/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java +++ b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java @@ -10,19 +10,19 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; -import backtype.storm.topology.base.BaseRichBolt; +import
[02/50] [abbrv] storm git commit: upgraded to storm 0.8.1
upgraded to storm 0.8.1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d6bae43 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d6bae43 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d6bae43 Branch: refs/heads/master Commit: 8d6bae43b596ebedf9b664cf8f54cd0fa622035e Parents: eda3c10 Author: Ofir HamerAuthored: Fri Nov 16 15:56:54 2012 + Committer: Ofir Hamer Committed: Fri Nov 16 15:56:54 2012 + -- examples/pom.xml| 4 ++-- .../backtype/storm/contrib/jms/example/GenericBolt.java | 4 ++-- pom.xml | 9 +++-- src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java | 6 +++--- .../java/backtype/storm/contrib/jms/spout/JmsSpout.java | 6 +++--- .../storm/contrib/jms/spout/MockSpoutOutputCollector.java | 6 +- 6 files changed, 22 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8d6bae43/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 99fdb19..39fd3d1 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -14,7 +14,7 @@ 2.5.6 - 0.6.2 + 0.8.1 @@ -52,7 +52,7 @@ com.github.ptgoetz storm-jms - 0.1.0-SNAPSHOT + 0.8.1-SNAPSHOT org.apache.activemq http://git-wip-us.apache.org/repos/asf/storm/blob/8d6bae43/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java -- diff --git a/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java b/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java index bd0dada..92a21f7 100644 --- a/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java +++ b/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java @@ -2,12 +2,12 @@ package backtype.storm.contrib.jms.example; import java.util.Map; +import backtype.storm.topology.base.BaseRichBolt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; @@ -22,7 +22,7 @@ import backtype.storm.tuple.Tuple; * */ @SuppressWarnings("serial") -public class GenericBolt implements IRichBolt { +public class GenericBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class); private OutputCollector collector; private boolean autoAck = false; http://git-wip-us.apache.org/repos/asf/storm/blob/8d6bae43/pom.xml -- diff --git a/pom.xml b/pom.xml index e9ce530..f4db48b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.2.2-SNAPSHOT + 0.8.1-SNAPSHOT Storm JMS Storm JMS Components @@ -38,7 +38,7 @@ - 0.6.2 + 0.8.1 @@ -53,6 +53,11 @@ geronimo-jms_1.1_spec 1.1.1 + + org.slf4j + slf4j-log4j12 + 1.5.8 + junit junit http://git-wip-us.apache.org/repos/asf/storm/blob/8d6bae43/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java b/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java index 487aa02..a8c15c5 100644 --- a/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java +++ b/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java @@ -10,6 +10,7 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; +import backtype.storm.topology.base.BaseRichBolt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,6 @@ import backtype.storm.contrib.jms.JmsMessageProducer; import backtype.storm.contrib.jms.JmsProvider; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; -import
[01/50] [abbrv] storm git commit: [maven-release-plugin] prepare for next development iteration
Repository: storm Updated Branches: refs/heads/master 06a7f54cb -> ce7ba7569 [maven-release-plugin] prepare for next development iteration Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eda3c109 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eda3c109 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eda3c109 Branch: refs/heads/master Commit: eda3c10913e7a8631d917bb626cbd443b06eb18e Parents: 3e1cd87 Author: P. Taylor GoetzAuthored: Wed May 2 14:47:22 2012 -0400 Committer: P. Taylor Goetz Committed: Wed May 2 14:47:22 2012 -0400 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/eda3c109/pom.xml -- diff --git a/pom.xml b/pom.xml index 747649b..e9ce530 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.2.1 + 0.2.2-SNAPSHOT Storm JMS Storm JMS Components
[39/50] [abbrv] storm git commit: merge storm-jms to external/storm-jms
http://git-wip-us.apache.org/repos/asf/storm/blob/a82f910a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java -- diff --cc external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java index 000,000..3e3f253 new file mode 100644 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java @@@ -1,0 -1,0 +1,393 @@@ ++package org.apache.storm.jms.trident; ++ ++import java.util.ArrayList; ++import java.util.List; ++import java.util.Map; ++import java.util.concurrent.LinkedBlockingQueue; ++ ++import javax.jms.Connection; ++import javax.jms.ConnectionFactory; ++import javax.jms.Destination; ++import javax.jms.JMSException; ++import javax.jms.Message; ++import javax.jms.MessageConsumer; ++import javax.jms.MessageListener; ++import javax.jms.Session; ++ ++import org.apache.storm.jms.JmsProvider; ++import org.apache.storm.jms.JmsTupleProducer; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import org.apache.storm.trident.operation.TridentCollector; ++import org.apache.storm.trident.spout.ITridentSpout; ++import org.apache.storm.trident.topology.TransactionAttempt; ++import org.apache.storm.Config; ++import org.apache.storm.generated.StreamInfo; ++import org.apache.storm.task.TopologyContext; ++import org.apache.storm.topology.OutputFieldsGetter; ++import org.apache.storm.tuple.Fields; ++import org.apache.storm.tuple.Values; ++import org.apache.storm.utils.RotatingMap; ++import org.apache.storm.utils.Utils; ++ ++/** ++ * Trident implementation of the JmsSpout, based on code provided by P. Taylor Goetz - https://github.com/ptgoetz ++ * ++ * @author Andy Toone for Metabroadcast ++ * ++ */ ++public class TridentJmsSpout implements ITridentSpout { ++ ++public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; ++ ++public static final int DEFAULT_BATCH_SIZE = 1000; ++ ++private static final long serialVersionUID = -3469351154693356655L; ++ ++private JmsTupleProducer tupleProducer; ++ ++private JmsProvider jmsProvider; ++ ++private int jmsAcknowledgeMode; ++ ++private String name; ++ ++private static int nameIndex = 1; ++ ++/** ++ * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE ++ */ ++public TridentJmsSpout() { ++this.name = "JmsSpout_"+(nameIndex++); ++this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; ++} ++ ++/** ++ * Set the name for this spout, to improve log identification ++ * @param name The name to be used in log messages ++ * @return This spout ++ */ ++public TridentJmsSpout named(String name) { ++this.name = name; ++return this; ++} ++ ++/** ++ * Set the JmsProvider ++ * implementation that this Spout will use to connect to ++ * a JMS javax.jms.Desination ++ * ++ * @param provider ++ */ ++public TridentJmsSpout withJmsProvider(JmsProvider provider){ ++this.jmsProvider = provider; ++return this; ++} ++ ++/** ++ * Set the JmsTupleProducer ++ * implementation that will convert javax.jms.Message ++ * object to backtype.storm.tuple.Values objects ++ * to be emitted. ++ * ++ * @param tupleProducer ++ * @return This spout ++ */ ++public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) { ++this.tupleProducer = tupleProducer; ++return this; ++} ++ ++/** ++ * Set the JMS acknowledge mode for messages being processed by this spout. ++ * ++ * Possible values: ++ * ++ * javax.jms.Session.AUTO_ACKNOWLEDGE ++ * javax.jms.Session.CLIENT_ACKNOWLEDGE ++ * javax.jms.Session.DUPS_OK_ACKNOWLEDGE ++ * ++ * @param jmsAcknowledgeMode The chosen acknowledge mode ++ * @return This spout ++ * @throws IllegalArgumentException if the mode is not recognized ++ */ ++public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) { ++toDeliveryModeString(jmsAcknowledgeMode); ++this.jmsAcknowledgeMode = jmsAcknowledgeMode; ++return this; ++} ++ ++/** ++ * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if ++ * the mode is not recognized. ++ * ++ * Possible values: ++ * ++ * javax.jms.Session.AUTO_ACKNOWLEDGE ++ * javax.jms.Session.CLIENT_ACKNOWLEDGE ++ * javax.jms.Session.DUPS_OK_ACKNOWLEDGE ++ * ++ * @param acknowledgeMode A valid JMS acknowledge mode ++ * @return A friendly string describing the acknowledge mode ++ * @throws IllegalArgumentException if the mode is not recognized ++ */ ++private static final String toDeliveryModeString(int acknowledgeMode) { ++switch (acknowledgeMode) { ++case
[42/50] [abbrv] storm git commit: storm-jms: split into multi-module project
http://git-wip-us.apache.org/repos/asf/storm/blob/4a2f50cf/external/storm-jms/examples/pom.xml -- diff --git a/external/storm-jms/examples/pom.xml b/external/storm-jms/examples/pom.xml index f9af27b..cb78f02 100644 --- a/external/storm-jms/examples/pom.xml +++ b/external/storm-jms/examples/pom.xml @@ -16,65 +16,64 @@ limitations under the License. --> http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> - 4.0.0 - backtype.storm.contrib - storm-jms-examples - 0.1-SNAPSHOT - Storm JMS Examples - Storm JMS Examples - - - clojars.org - http://clojars.org/repo - - - - 2.5.6 - 0.9.3 - - - - org.springframework - spring-beans - ${spring.version} - - - org.springframework - spring-core - ${spring.version} - - - org.springframework - spring-context - ${spring.version} - - - org.springframework - spring-jms - ${spring.version} - - - org.apache.xbean - xbean-spring - 3.7 - - - org.apache.storm - storm-core - ${storm.version} - - provided - - - com.github.ptgoetz - storm-jms - 0.9.2-SNAPSHOT - - - org.apache.activemq - activemq-core - 5.4.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.storm +storm-jms-parent +2.0.0-SNAPSHOT +../pom.xml + + + +storm-jms-examples + + +2.5.6 + + + +org.springframework +spring-beans +${spring.version} + + +org.springframework +spring-core +${spring.version} + + +org.springframework +spring-context +${spring.version} + + +org.springframework +spring-jms +${spring.version} + + +org.apache.xbean +xbean-spring +3.7 + + +org.apache.storm +storm-core +${project.version} + +provided + + +org.apache.storm +storm-jms +${project.version} + + +org.apache.activemq +activemq-core +5.4.0 org.slf4j @@ -85,78 +84,68 @@ log4j - - - - - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - - - - - - make-assembly - package - - single - - - - - - - - org.codehaus.mojo - exec-maven-plugin - 1.2.1 - - - - exec -
[38/50] [abbrv] storm git commit: [maven-release-plugin] prepare for next development iteration
[maven-release-plugin] prepare for next development iteration Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aab6acdf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aab6acdf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aab6acdf Branch: refs/heads/master Commit: aab6acdf316c48bf566e37776790116f2794199b Parents: 6112d64 Author: P. Taylor GoetzAuthored: Thu Aug 25 16:29:30 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Aug 25 16:29:30 2016 -0400 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/aab6acdf/pom.xml -- diff --git a/pom.xml b/pom.xml index 1aeb092..890a9ab 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 1.0.2 + 1.0.3-SNAPSHOT Storm JMS Storm JMS Components
[47/50] [abbrv] storm git commit: storm-jms: apply Apache license headers
storm-jms: apply Apache license headers Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96431b31 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96431b31 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96431b31 Branch: refs/heads/master Commit: 96431b31c4b06dfb2e69b0828cd0e35bd5903eb4 Parents: 6a57e16 Author: P. Taylor GoetzAuthored: Wed Oct 5 14:26:28 2016 -0400 Committer: P. Taylor Goetz Committed: Wed Oct 5 14:26:28 2016 -0400 -- .../apache/storm/jms/JmsMessageProducer.java| 17 +++ .../java/org/apache/storm/jms/JmsProvider.java | 17 +++ .../org/apache/storm/jms/JmsTupleProducer.java | 17 +++ .../java/org/apache/storm/jms/bolt/JmsBolt.java | 17 +++ .../apache/storm/jms/spout/JmsMessageID.java| 17 +++ .../org/apache/storm/jms/spout/JmsSpout.java| 17 +++ .../org/apache/storm/jms/trident/JmsBatch.java | 17 +++ .../storm/jms/trident/TridentJmsSpout.java | 17 +++ .../apache/storm/jms/example/GenericBolt.java | 150 +-- .../storm/jms/example/SpringJmsProvider.java| 3 +- 10 files changed, 212 insertions(+), 77 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/96431b31/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java -- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java index 6f26d89..4932929 100644 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.jms; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/96431b31/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java -- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java index 5d96bb8..d976326 100644 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java +++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.jms; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/96431b31/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java -- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java index 7d42ab5..0bbb3a0 100644 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java +++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache
[23/50] [abbrv] storm git commit: fix to acking logic to make sure we don't lose messages
fix to acking logic to make sure we don't lose messages Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e18ec1f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e18ec1f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e18ec1f Branch: refs/heads/master Commit: 6e18ec1fedbc4b7c206fb305ed8b298c30fe5d96 Parents: f6b3cb9 Author: P. Taylor GoetzAuthored: Mon Jul 14 13:45:54 2014 -0400 Committer: P. Taylor Goetz Committed: Mon Jul 14 13:45:54 2014 -0400 -- .../storm/contrib/jms/spout/JmsMessageID.java | 75 .../storm/contrib/jms/spout/JmsSpout.java | 47 +++- 2 files changed, 104 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6e18ec1f/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java b/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java new file mode 100644 index 000..437f5ec --- /dev/null +++ b/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java @@ -0,0 +1,75 @@ +package backtype.storm.contrib.jms.spout; + +import javax.jms.Message; +import java.io.Serializable; +import java.util.TreeSet; + +/** + * Created by tgoetz on 7/14/14. + */ +public class JmsMessageID implements Comparable, Serializable { + +private String jmsID; + +private Long sequence; + +//private Message message; + +public JmsMessageID(long sequence, String jmsID){ +this.jmsID = jmsID; +this.sequence = sequence; +} + +//public void setMessage(Message message){ +//this.message = message; +//} +// +//public Message getMessage(){ +//return this.message; +//} + +public String getJmsID(){ +return this.jmsID; +} + +@Override +public int compareTo(JmsMessageID jmsMessageID) { +return (int)(this.sequence - jmsMessageID.sequence); +} + +@Override +public int hashCode() { +return this.sequence.hashCode(); +} + +@Override +public boolean equals(Object o) { +if(o instanceof JmsMessageID){ +JmsMessageID id = (JmsMessageID)o; +return this.jmsID.equals(id.jmsID); +} else { +return false; +} +} + +public String toString(){ +return String.valueOf(this.sequence); +} + + +public static void main(String[] args) { +TreeSet set = new TreeSet(); +set.add(new JmsMessageID(7, "bar")); +set.add(new JmsMessageID(1, "barfoo")); + +set.add(new JmsMessageID(10, "foobar")); +set.add(new JmsMessageID(3, "foo")); + +for(JmsMessageID id : set){ +System.out.println(id); +} + + + +} +} http://git-wip-us.apache.org/repos/asf/storm/blob/6e18ec1f/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java b/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java index 2164d52..a2bdb4f 100644 --- a/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java +++ b/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java @@ -1,9 +1,7 @@ package backtype.storm.contrib.jms.spout; import java.io.Serializable; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -60,7 +58,9 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { private JmsProvider jmsProvider; private LinkedBlockingQueue queue; - private ConcurrentHashMap pendingMessages; + private TreeSet toCommit; +private HashMap pendingMessages; +private long messageSequence = 0; private SpoutOutputCollector collector; @@ -167,7 +167,8 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { " secs. This could lead to a message replay flood!"); } this.queue = new LinkedBlockingQueue(); - this.pendingMessages = new ConcurrentHashMap (); + this.toCommit = new TreeSet(); +this.pendingMessages = new HashMap (); this.collector = collector; try { ConnectionFactory cf = this.jmsProvider.connectionFactory(); @@ -216,12 +217,14 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
[18/50] [abbrv] storm git commit: update dependency exclusions to get tests passing
update dependency exclusions to get tests passing Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7794295 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7794295 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7794295 Branch: refs/heads/master Commit: c7794295f9c9bbe3b488629c30998538369ec36e Parents: 1c74af2 Author: P. Taylor GoetzAuthored: Tue Jan 21 15:23:30 2014 -0500 Committer: P. Taylor Goetz Committed: Tue Jan 21 15:23:30 2014 -0500 -- examples/pom.xml | 14 -- pom.xml | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c7794295/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 405ba7a..7932bfa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -14,7 +14,7 @@ 2.5.6 - 0.8.2 + 0.9.0.1 @@ -52,12 +52,22 @@ com.github.ptgoetz storm-jms - 0.8.2-SNAPSHOT + 0.9.0-SNAPSHOT org.apache.activemq activemq-core 5.4.0 + + +org.slf4j +slf4j-api + + +log4j +log4j + + http://git-wip-us.apache.org/repos/asf/storm/blob/c7794295/pom.xml -- diff --git a/pom.xml b/pom.xml index 7011aab..ab348de 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.8.2-SNAPSHOT + 0.9.0-SNAPSHOT Storm JMS Storm JMS Components
[28/50] [abbrv] storm git commit: fix acknowledgeMode bug in trident spout
fix acknowledgeMode bug in trident spout Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fc7f30e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fc7f30e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fc7f30e Branch: refs/heads/master Commit: 3fc7f30ed693354f014f00948c527e0f2273c66c Parents: 6212947 Author: P. Taylor GoetzAuthored: Mon Oct 27 15:51:16 2014 -0400 Committer: P. Taylor Goetz Committed: Mon Oct 27 15:51:16 2014 -0400 -- examples/src/main/resources/jms-activemq.xml | 2 +- src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java | 3 +-- .../java/backtype/storm/contrib/jms/trident/TridentJmsSpout.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/3fc7f30e/examples/src/main/resources/jms-activemq.xml -- diff --git a/examples/src/main/resources/jms-activemq.xml b/examples/src/main/resources/jms-activemq.xml index b720ae3..1a845b8 100644 --- a/examples/src/main/resources/jms-activemq.xml +++ b/examples/src/main/resources/jms-activemq.xml @@ -1,4 +1,4 @@ - +
[37/50] [abbrv] storm git commit: [maven-release-plugin] prepare release storm-jms-1.0.2
[maven-release-plugin] prepare release storm-jms-1.0.2 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6112d647 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6112d647 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6112d647 Branch: refs/heads/master Commit: 6112d6474524237cbc44f3430d20f6d1d419eae8 Parents: bfbd6ed Author: P. Taylor GoetzAuthored: Thu Aug 25 16:29:27 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Aug 25 16:29:27 2016 -0400 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6112d647/pom.xml -- diff --git a/pom.xml b/pom.xml index a144f44..1aeb092 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 1.0.2-SNAPSHOT + 1.0.2 Storm JMS Storm JMS Components
[16/50] [abbrv] storm git commit: remove duplicate section from previous pull request.
remove duplicate section from previous pull request. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/42297bff Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/42297bff Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/42297bff Branch: refs/heads/master Commit: 42297bffcbb0259c92a47aca7ba1e6f243cf41f3 Parents: c725840 Author: P. Taylor GoetzAuthored: Tue Jan 21 14:36:52 2014 -0500 Committer: P. Taylor Goetz Committed: Tue Jan 21 14:36:52 2014 -0500 -- pom.xml | 6 -- 1 file changed, 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/42297bff/pom.xml -- diff --git a/pom.xml b/pom.xml index b95ca6f..cf5c1be 100644 --- a/pom.xml +++ b/pom.xml @@ -82,12 +82,6 @@ - - - clojars.org - http://clojars.org/repo - -
[15/50] [abbrv] storm git commit: Merge pull request #7 from AndyToone/Trident-JmsSpout
Merge pull request #7 from AndyToone/Trident-JmsSpout Trident jms spout Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7258403 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7258403 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7258403 Branch: refs/heads/master Commit: c7258403edf047d72042ec68c2e9a7de3924f0e5 Parents: 2520fc2 ab47497 Author: P. Taylor GoetzAuthored: Thu Nov 21 20:13:50 2013 -0800 Committer: P. Taylor Goetz Committed: Thu Nov 21 20:13:50 2013 -0800 -- pom.xml | 9 + .../backtype/storm/contrib/jms/JmsBatch.java| 10 + .../storm/contrib/jms/TridentJmsSpout.java | 391 +++ 3 files changed, 410 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c7258403/pom.xml --
[24/50] [abbrv] storm git commit: cleanup and upgrade to storm 0.9.2
cleanup and upgrade to storm 0.9.2 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/183f74fd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/183f74fd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/183f74fd Branch: refs/heads/master Commit: 183f74fd8a9c66619ea568cbe77ec5a3c43afceb Parents: 6e18ec1 Author: P. Taylor GoetzAuthored: Mon Jul 14 14:30:37 2014 -0400 Committer: P. Taylor Goetz Committed: Mon Jul 14 14:30:37 2014 -0400 -- examples/pom.xml| 12 ++-- pom.xml | 6 +++--- .../storm/contrib/jms/spout/JmsMessageID.java | 20 3 files changed, 9 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/183f74fd/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 4b4f6de..a7cbd6b 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -31,7 +31,7 @@ 2.5.6 - 0.9.0.1 + 0.9.2-incubating @@ -60,8 +60,8 @@ 3.7 - storm - storm + org.apache.storm + storm-core ${storm.version} provided @@ -69,7 +69,7 @@ com.github.ptgoetz storm-jms - 0.9.0-SNAPSHOT + 0.9.2-SNAPSHOT org.apache.activemq @@ -140,8 +140,8 @@ - storm - storm + org.apache.storm + storm-core ${storm.version} jar http://git-wip-us.apache.org/repos/asf/storm/blob/183f74fd/pom.xml -- diff --git a/pom.xml b/pom.xml index af6bc54..235fbdc 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.9.1-SNAPSHOT + 0.9.2-SNAPSHOT Storm JMS Storm JMS Components @@ -61,11 +61,11 @@ - 0.9.0.1 + 0.9.2-incubating - storm + org.apache.storm storm-core ${storm.version} http://git-wip-us.apache.org/repos/asf/storm/blob/183f74fd/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java b/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java index 437f5ec..0aee193 100644 --- a/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java +++ b/src/main/java/backtype/storm/contrib/jms/spout/JmsMessageID.java @@ -52,24 +52,4 @@ public class JmsMessageID implements Comparable, Serializable { } } -public String toString(){ -return String.valueOf(this.sequence); -} - - -public static void main(String[] args) { -TreeSet set = new TreeSet(); -set.add(new JmsMessageID(7, "bar")); -set.add(new JmsMessageID(1, "barfoo")); - -set.add(new JmsMessageID(10, "foobar")); -set.add(new JmsMessageID(3, "foo")); - -for(JmsMessageID id : set){ -System.out.println(id); -} - - - -} }
[11/50] [abbrv] storm git commit: Added repositories section to streamline maven builds.
Added repositories section to streamline maven builds. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/34e6ea79 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/34e6ea79 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/34e6ea79 Branch: refs/heads/master Commit: 34e6ea79cce5f91b8347ffdadda02da8f7a9e05c Parents: 7fc8e93 Author: Paul CoddingAuthored: Mon Oct 14 14:14:24 2013 -0500 Committer: Paul Codding Committed: Mon Oct 14 14:14:24 2013 -0500 -- pom.xml | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/34e6ea79/pom.xml -- diff --git a/pom.xml b/pom.xml index 3f6cabb..49e03bc 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,14 @@ scm:git:g...@github.com:ptgoetz/storm-jms.git :g...@github.com:ptgoetz/storm-jms.git - + + + + clojars.org + http://clojars.org/repo + + + ptgoetz @@ -85,4 +92,4 @@ - \ No newline at end of file +
[45/50] [abbrv] storm git commit: storm-jms: move main source to submodule
storm-jms: move main source to submodule Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/13f4b07f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/13f4b07f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/13f4b07f Branch: refs/heads/master Commit: 13f4b07f707d50c50abeac910bc0fd144e542070 Parents: 5ede1ee Author: P. Taylor GoetzAuthored: Wed Oct 5 14:06:56 2016 -0400 Committer: P. Taylor Goetz Committed: Wed Oct 5 14:06:56 2016 -0400 -- external/storm-jms/README.markdown | 32 +- .../apache/storm/jms/JmsMessageProducer.java| 32 -- .../java/org/apache/storm/jms/JmsProvider.java | 30 -- .../org/apache/storm/jms/JmsTupleProducer.java | 41 -- .../java/org/apache/storm/jms/bolt/JmsBolt.java | 201 -- .../apache/storm/jms/spout/JmsMessageID.java| 41 -- .../org/apache/storm/jms/spout/JmsSpout.java| 366 - .../org/apache/storm/jms/trident/JmsBatch.java | 10 - .../org/apache/storm/jms/trident/JmsState.java | 129 -- .../storm/jms/trident/JmsStateFactory.java | 40 -- .../apache/storm/jms/trident/JmsUpdater.java| 38 -- .../storm/jms/trident/TridentJmsSpout.java | 393 --- .../apache/storm/jms/spout/JmsSpoutTest.java| 88 - .../apache/storm/jms/spout/MockJmsProvider.java | 62 --- .../jms/spout/MockSpoutOutputCollector.java | 55 --- .../storm/jms/spout/MockTupleProducer.java | 47 --- .../src/test/resources/jndi.properties | 18 - 17 files changed, 1 insertion(+), 1622 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/13f4b07f/external/storm-jms/README.markdown -- diff --git a/external/storm-jms/README.markdown b/external/storm-jms/README.markdown index 05fb6ea..93f6cbe 100644 --- a/external/storm-jms/README.markdown +++ b/external/storm-jms/README.markdown @@ -1,7 +1,6 @@ ## About Storm JMS Storm JMS is a generic framework for integrating JMS messaging within the Storm framework. -The [Storm Rationale page](https://storm.apache.org/releases/1.0.2/Rationale.html) explains what storm is and why it was built. Storm-JMS allows you to inject data into Storm via a generic JMS spout, as well as consume data from Storm via a generic JMS bolt. @@ -20,36 +19,7 @@ The JMS Bolt component allows for data within a Storm topology to be published t A JMS Bolt connects to a JMS Destination, and publishes JMS Messages based on the Storm "Tuple" objects it receives. -## Project Location -Primary development of storm-jms will take place at: -https://github.com/ptgoetz/storm-jms -Maven artifacts for releases will be available on maven central. - - -## Documentation - -Documentation and tutorials can be found on the [Storm-JMS wiki](http://github.com/ptgoetz/storm-jms/wiki). - -## License - -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - -## Contributors +## Committer Sponsors * P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz)) http://git-wip-us.apache.org/repos/asf/storm/blob/13f4b07f/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java -- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java deleted file mode 100644 index f77b6e1..000 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.storm.jms; - -import java.io.Serializable; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; - -import org.apache.storm.tuple.ITuple; -/** - * JmsMessageProducer implementations are responsible for translating - * a org.apache.storm.tuple.Values instance into a - * javax.jms.Message object. - * - * - * - * @author P. Taylor Goetz - * - */ -public interface JmsMessageProducer extends Serializable{
[46/50] [abbrv] storm git commit: storm-jms: remove @author tags and reformat tabs to spaces
storm-jms: remove @author tags and reformat tabs to spaces Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a57e164 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a57e164 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a57e164 Branch: refs/heads/master Commit: 6a57e1640bf7e1001e8ca2f7c1182a2f2a711e31 Parents: 13f4b07 Author: P. Taylor GoetzAuthored: Wed Oct 5 14:12:21 2016 -0400 Committer: P. Taylor Goetz Committed: Wed Oct 5 14:12:21 2016 -0400 -- .../apache/storm/jms/JmsMessageProducer.java| 33 +- .../java/org/apache/storm/jms/JmsProvider.java | 33 +- .../org/apache/storm/jms/JmsTupleProducer.java | 36 +- .../java/org/apache/storm/jms/bolt/JmsBolt.java | 307 +- .../org/apache/storm/jms/spout/JmsSpout.java| 583 +-- .../storm/jms/trident/TridentJmsSpout.java | 3 +- 6 files changed, 495 insertions(+), 500 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6a57e164/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java -- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java index f77b6e1..6f26d89 100644 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java @@ -7,26 +7,23 @@ import javax.jms.Message; import javax.jms.Session; import org.apache.storm.tuple.ITuple; + /** * JmsMessageProducer implementations are responsible for translating - * a org.apache.storm.tuple.Values instance into a + * a org.apache.storm.tuple.Values instance into a * javax.jms.Message object. - * - * - * - * @author P. Taylor Goetz - * + * */ -public interface JmsMessageProducer extends Serializable{ - - /** -* Translate a org.apache.storm.tuple.Tuple object -* to a javax.jms.Messageorg.apache.storm.tuple.Tuple object + * to a javax.jms.Messagehttp://git-wip-us.apache.org/repos/asf/storm/blob/6a57e164/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java -- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java index 5887026..5d96bb8 100644 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java +++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java @@ -4,27 +4,28 @@ import java.io.Serializable; import javax.jms.ConnectionFactory; import javax.jms.Destination; + /** * A JmsProvider object encapsulates the ConnectionFactory * and Destination JMS objects the JmsSpout needs to manage * a topic/queue connection over the course of it's lifecycle. - * - * @author P. Taylor Goetz * */ -public interface JmsProvider extends Serializable{ - /** -* Provides the JMS ConnectionFactory -* @return the connection factory -* @throws Exception -*/ - public ConnectionFactory connectionFactory() throws Exception; +public interface JmsProvider extends Serializable { +/** + * Provides the JMS ConnectionFactory + * + * @return the connection factory + * @throws Exception + */ +public ConnectionFactory connectionFactory() throws Exception; - /** -* Provides the Destination (topic or queue) from which the -* JmsSpout will receive messages. -* @return -* @throws Exception -*/ - public Destination destination() throws Exception; +/** + * Provides the Destination (topic or queue) from which the + * JmsSpout will receive messages. + * + * @return + * @throws Exception + */ +public Destination destination() throws Exception; } http://git-wip-us.apache.org/repos/asf/storm/blob/6a57e164/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java -- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java index 8146db3..7d42ab5 100644 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java +++ b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -11,31 +11,31 @@ import org.apache.storm.tuple.Values; /** * Interface to define classes
[35/50] [abbrv] storm git commit: turn off javadoc lint
turn off javadoc lint Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89100c41 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89100c41 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89100c41 Branch: refs/heads/master Commit: 89100c416b3eb47e062d6ce94de761a6799d898f Parents: 1ad1b51 Author: P. Taylor GoetzAuthored: Thu Aug 25 16:24:21 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Aug 25 16:24:21 2016 -0400 -- pom.xml | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/89100c41/pom.xml -- diff --git a/pom.xml b/pom.xml index 2e835f3..1aeb092 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 1.0.2-SNAPSHOT + 1.0.2 Storm JMS Storm JMS Components @@ -108,6 +108,14 @@ 1.6 + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9 + + -Xdoclint:none + +
[21/50] [abbrv] storm git commit: [maven-release-plugin] prepare release storm-jms-0.9.0
[maven-release-plugin] prepare release storm-jms-0.9.0 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e46a534 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e46a534 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e46a534 Branch: refs/heads/master Commit: 0e46a534ba6ce861e3be60b37fcebf906c911337 Parents: 6b058e4 Author: P. Taylor GoetzAuthored: Wed Feb 19 17:31:34 2014 -0500 Committer: P. Taylor Goetz Committed: Wed Feb 19 17:31:34 2014 -0500 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/0e46a534/pom.xml -- diff --git a/pom.xml b/pom.xml index 06cc279..1647e53 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.9.0-SNAPSHOT + 0.9.0 Storm JMS Storm JMS Components
[04/50] [abbrv] storm git commit: Merge pull request #5 from MailOnline/storm-0.8.1
Merge pull request #5 from MailOnline/storm-0.8.1 upgraded to storm 0.8.1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/75506dc9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/75506dc9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/75506dc9 Branch: refs/heads/master Commit: 75506dc93bf84730e9c10dbb13015eba42a1152d Parents: eda3c10 8d6bae4 Author: P. Taylor GoetzAuthored: Fri Dec 7 06:45:59 2012 -0800 Committer: P. Taylor Goetz Committed: Fri Dec 7 06:45:59 2012 -0800 -- examples/pom.xml| 4 ++-- .../backtype/storm/contrib/jms/example/GenericBolt.java | 4 ++-- pom.xml | 9 +++-- src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java | 6 +++--- .../java/backtype/storm/contrib/jms/spout/JmsSpout.java | 6 +++--- .../storm/contrib/jms/spout/MockSpoutOutputCollector.java | 6 +- 6 files changed, 22 insertions(+), 13 deletions(-) --
[03/50] [abbrv] storm git commit: Add support for sending to temporary reply queues
Add support for sending to temporary reply queues By setting the destination in the message producer, you can force the message to go to a temporary queue. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8e811235 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8e811235 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8e811235 Branch: refs/heads/master Commit: 8e8112357057b30bbe1c51edd878adb2cb2aae4e Parents: 8d6bae4 Author: Tyler BensonAuthored: Fri Nov 30 16:55:24 2012 -0800 Committer: Tyler Benson Committed: Fri Nov 30 16:55:24 2012 -0800 -- src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8e811235/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java b/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java index a8c15c5..2ede2e7 100644 --- a/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java +++ b/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java @@ -138,7 +138,11 @@ public class JmsBolt extends BaseRichBolt { try { Message msg = this.producer.toMessage(this.session, input); if(msg != null){ - this.messageProducer.send(msg); + if (msg.getJMSDestination() != null) { + this.messageProducer.send(msg.getJMSDestination(), msg); + } else { + this.messageProducer.send(msg); + } } if(this.autoAck){ LOG.debug("ACKing tuple: " + input);
[07/50] [abbrv] storm git commit: [maven-release-plugin] prepare for next development iteration
[maven-release-plugin] prepare for next development iteration Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7fc8e934 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7fc8e934 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7fc8e934 Branch: refs/heads/master Commit: 7fc8e9342fa080f9c5a9055825c0542b6e4f00dc Parents: 489e2b5 Author: P. Taylor GoetzAuthored: Mon Jun 10 10:32:29 2013 -0400 Committer: P. Taylor Goetz Committed: Mon Jun 10 10:32:29 2013 -0400 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/7fc8e934/pom.xml -- diff --git a/pom.xml b/pom.xml index 5c2fc39..3f6cabb 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.8.1 + 0.8.2-SNAPSHOT Storm JMS Storm JMS Components
[10/50] [abbrv] storm git commit: Initial commit
Initial commit Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab474979 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab474979 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab474979 Branch: refs/heads/master Commit: ab4749798fb445798976330c7c9991aec47796fd Parents: 217acc9 Author: andy_lock_farmAuthored: Tue Jun 18 11:02:38 2013 +0100 Committer: andy_lock_farm Committed: Tue Jun 18 11:02:38 2013 +0100 -- .../backtype/storm/contrib/jms/JmsBatch.java| 10 + .../storm/contrib/jms/TridentJmsSpout.java | 391 +++ 2 files changed, 401 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ab474979/src/main/java/backtype/storm/contrib/jms/JmsBatch.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/JmsBatch.java b/src/main/java/backtype/storm/contrib/jms/JmsBatch.java new file mode 100644 index 000..45b2e03 --- /dev/null +++ b/src/main/java/backtype/storm/contrib/jms/JmsBatch.java @@ -0,0 +1,10 @@ +package backtype.storm.contrib.jms; + +/** + * Batch coordination metadata object for the TridentJmsSpout. + * This implementation does not use batch metadata, so the object is empty. + * + */ +public class JmsBatch { +// Empty class +} http://git-wip-us.apache.org/repos/asf/storm/blob/ab474979/src/main/java/backtype/storm/contrib/jms/TridentJmsSpout.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/TridentJmsSpout.java b/src/main/java/backtype/storm/contrib/jms/TridentJmsSpout.java new file mode 100644 index 000..a543218 --- /dev/null +++ b/src/main/java/backtype/storm/contrib/jms/TridentJmsSpout.java @@ -0,0 +1,391 @@ +package backtype.storm.contrib.jms; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import storm.trident.operation.TridentCollector; +import storm.trident.spout.ITridentSpout; +import storm.trident.topology.TransactionAttempt; +import backtype.storm.Config; +import backtype.storm.generated.StreamInfo; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsGetter; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.RotatingMap; +import backtype.storm.utils.Utils; + +/** + * Trident implementation of the JmsSpout, based on code provided by P. Taylor Goetz - https://github.com/ptgoetz + * + * @author Andy Toone for Metabroadcast + * + */ +public class TridentJmsSpout implements ITridentSpout { + +public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; + +public static final int DEFAULT_BATCH_SIZE = 1000; + +private static final long serialVersionUID = -3469351154693356655L; + +private JmsTupleProducer tupleProducer; + +private JmsProvider jmsProvider; + +private int jmsAcknowledgeMode; + +private String name; + +private static int nameIndex = 1; + +/** + * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE + */ +public TridentJmsSpout() { +this.name = "JmsSpout_"+(nameIndex++); +this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; +} + +/** + * Set the name for this spout, to improve log identification + * @param name The name to be used in log messages + * @return This spout + */ +public TridentJmsSpout named(String name) { +this.name = name; +return this; +} + +/** + * Set the backtype.storm.contrib.jms.JmsProvider + * implementation that this Spout will use to connect to + * a JMS javax.jms.Desination + * + * @param provider + */ +public TridentJmsSpout withJmsProvider(JmsProvider provider){ +this.jmsProvider = provider; +return this; +} + +/** + * Set the backtype.storm.contrib.jms.JmsTupleProducer + * implementation that will convert javax.jms.Message + * object to backtype.storm.tuple.Values objects + * to be emitted. + * + * @param tupleProducer + * @return This spout + */ +public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) { +this.tupleProducer = tupleProducer; +return this; +} + +/** + * Set the
[30/50] [abbrv] storm git commit: rename packages
http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/JmsTupleProducer.java -- diff --git a/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java new file mode 100644 index 000..96b027d --- /dev/null +++ b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -0,0 +1,41 @@ +package org.apache.storm.jms; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.Message; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Values; + +/** + * Interface to define classes that can produce a Storm Values objects + * from a javax.jms.Message object>. + * + * Implementations are also responsible for declaring the output + * fields they produce. + * + * If for some reason the implementation can't process a message + * (for example if it received a javax.jms.ObjectMessage + * when it was expecting a javax.jms.TextMessage it should + * return null to indicate to the JmsSpout that + * the message could not be processed. + * + * @author P. Taylor Goetz + * + */ +public interface JmsTupleProducer extends Serializable{ + /** +* Process a JMS message object to create a Values object. +* @param msg - the JMS message +* @return the Values tuple, or null if the message couldn't be processed. +* @throws JMSException +*/ + Values toTuple(Message msg) throws JMSException; + + /** +* Declare the output fields produced by this JmsTupleProducer. +* @param declarer The OuputFieldsDeclarer for the spout. +*/ + void declareOutputFields(OutputFieldsDeclarer declarer); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java -- diff --git a/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java new file mode 100644 index 000..896f932 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java @@ -0,0 +1,201 @@ +package org.apache.storm.jms.bolt; + +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import backtype.storm.topology.base.BaseRichBolt; +import org.apache.storm.jms.JmsMessageProducer; +import org.apache.storm.jms.JmsProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +/** + * A JmsBolt receives backtype.storm.tuple.Tuple objects from a Storm + * topology and publishes JMS Messages to a destination (topic or queue). + * + * To use a JmsBolt in a topology, the following must be supplied: + * + * A JmsProvider implementation. + * A JmsMessageProducer implementation. + * + * The JmsProvider provides the JMS javax.jms.ConnectionFactory + * and javax.jms.Destination objects requied to publish JMS messages. + * + * The JmsBolt uses a JmsMessageProducer to translate + * backtype.storm.tuple.Tuple objects into + * javax.jms.Message objects for publishing. + * + * Both JmsProvider and JmsMessageProducer must be set, or the bolt will + * fail upon deployment to a cluster. + * + * The JmsBolt is typically an endpoint in a topology -- in other words + * it does not emit any tuples. + * + * + * @author P. Taylor Goetz + * + */ +public class JmsBolt extends BaseRichBolt { + private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); + + private boolean autoAck = true; + + // javax.jms objects + private Connection connection; + private Session session; + private MessageProducer messageProducer; + + // JMS options + private boolean jmsTransactional = false; + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + + private JmsProvider jmsProvider; + private JmsMessageProducer producer; + + + private OutputCollector collector; + + /** +* Set the JmsProvider used to connect to the JMS destination topic/queue +* @param provider +*/ + public void setJmsProvider(JmsProvider provider){ + this.jmsProvider = provider; + } + + /** +* Set the JmsMessageProducer used to convert tuples +* into JMS messages. +* +* @param producer +*/ + public void setJmsMessageProducer(JmsMessageProducer producer){ + this.producer = producer; + } + + /**
[34/50] [abbrv] storm git commit: bump version to 1.0.2
bump version to 1.0.2 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1ad1b516 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1ad1b516 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1ad1b516 Branch: refs/heads/master Commit: 1ad1b5168592c884403d9fcbfe6e041ab15e08db Parents: 0d2319c Author: P. Taylor GoetzAuthored: Thu Aug 25 16:14:33 2016 -0400 Committer: P. Taylor Goetz Committed: Thu Aug 25 16:14:33 2016 -0400 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/1ad1b516/pom.xml -- diff --git a/pom.xml b/pom.xml index aca9129..2e835f3 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.github.ptgoetz storm-jms - 0.9.2-SNAPSHOT + 1.0.2-SNAPSHOT Storm JMS Storm JMS Components
[41/50] [abbrv] storm git commit: integrate storm-jms with main build
integrate storm-jms with main build Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c116945a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c116945a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c116945a Branch: refs/heads/master Commit: c116945a37d03c3cb85bd6bd85f6315f9353543e Parents: a82f910 Author: P. Taylor GoetzAuthored: Wed Oct 5 13:14:17 2016 -0400 Committer: P. Taylor Goetz Committed: Wed Oct 5 13:14:17 2016 -0400 -- external/storm-jms/pom.xml | 138 pom.xml| 1 + 2 files changed, 57 insertions(+), 82 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c116945a/external/storm-jms/pom.xml -- diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml index 890a9ab..c381a7c 100644 --- a/external/storm-jms/pom.xml +++ b/external/storm-jms/pom.xml @@ -15,74 +15,56 @@ See the License for the specific language governing permissions and limitations under the License. --> -http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +storm +org.apache.storm +2.0.0-SNAPSHOT +../../pom.xml + - - org.sonatype.oss - oss-parent - 7 - - 4.0.0 - com.github.ptgoetz - storm-jms - 1.0.3-SNAPSHOT - Storm JMS - Storm JMS Components +storm-jms - - -The Apache Software License, Version 2.0 -http://www.apache.org/licenses/LICENSE-2.0.txt - - - - scm:git:g...@github.com:ptgoetz/storm-jms.git - scm:git:g...@github.com:ptgoetz/storm-jms.git - :g...@github.com:ptgoetz/storm-jms.git - - - - - ptgoetz - P. Taylor Goetz - ptgo...@gmail.com - - + + +ptgoetz +P. Taylor Goetz +ptgo...@gmail.com + + + + +org.apache.storm +storm-core +${project.version} + +provided + + +org.apache.geronimo.specs +geronimo-jms_1.1_spec +1.1.1 + + +junit +junit +4.10 +test + - - 1.0.2 - - - - org.apache.storm - storm-core - ${storm.version} - - provided - - - org.apache.geronimo.specs - geronimo-jms_1.1_spec - 1.1.1 - - - junit - junit - 4.10 - test - - - - - org.apache.activemq - activemq-core - 5.5.1 - test + + +org.apache.activemq +activemq-core +5.5.1 +test org.slf4j @@ -93,29 +75,21 @@ log4j - + - + - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.9 - - -Xdoclint:none - - - - + + + +org.apache.maven.plugins +maven-javadoc-plugin +2.9 + +-Xdoclint:none +
[48/50] [abbrv] storm git commit: storm-jms: import documentation
storm-jms: import documentation Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd22d743 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd22d743 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd22d743 Branch: refs/heads/master Commit: fd22d74317ab55426af2760478b2794a481db29b Parents: 96431b3 Author: P. Taylor GoetzAuthored: Wed Oct 5 16:34:46 2016 -0400 Committer: P. Taylor Goetz Committed: Wed Oct 5 16:34:46 2016 -0400 -- docs/images/Storm-JMS-Example.png | Bin 0 -> 40432 bytes docs/index.md | 1 + docs/storm-jms-example.md | 111 + docs/storm-jms-spring.md | 25 docs/storm-jms.md | 33 ++ 5 files changed, 170 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/fd22d743/docs/images/Storm-JMS-Example.png -- diff --git a/docs/images/Storm-JMS-Example.png b/docs/images/Storm-JMS-Example.png new file mode 100644 index 000..80e3493 Binary files /dev/null and b/docs/images/Storm-JMS-Example.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/fd22d743/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 7effdde..f951adf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -73,6 +73,7 @@ Trident is an alternative interface to Storm. It provides exactly-once processin * [Apache Solr Integration](storm-solr.html) * [Apache Cassandra Integration](storm-cassandra.html) * [JDBC Integration](storm-jdbc.html) +* [JMS Integration](storm-jms.html) * [Redis Integration](storm-redis.html) * [Event Hubs Intergration](storm-eventhubs.html) * [Elasticsearch Integration](storm-elasticsearch.html) http://git-wip-us.apache.org/repos/asf/storm/blob/fd22d743/docs/storm-jms-example.md -- diff --git a/docs/storm-jms-example.md b/docs/storm-jms-example.md new file mode 100644 index 000..29cadf1 --- /dev/null +++ b/docs/storm-jms-example.md @@ -0,0 +1,111 @@ +--- +title: Storm JMS Integration +layout: documentation +documentation: true +--- +## Example Storm JMS Topology + +The storm-jms source code contains an example project (in the "examples" directory) +builds a multi-bolt/multi-spout topology (depicted below) that uses the JMS Spout and JMS Bolt components. + +![picture alt](images/Storm-JMS-Example.png "Example JMS Topology") + +The green components represent instances of the storm-jms components. White components represent +"standard" Storm bolts (in the example these bolts are instances of `GenericBolt` which simply logs +information about the tuples it receives and emits). + +Grey arrows represent JMS messages, while black arrows represent the flow of Storm tuple objects. + +### JMS Transactions and Gauranteed Processing +The example is set up to be "transactional," meaning the JMS Spout will use Storm's guranteed +processing capabilities to determine if a JMS Message should be acknowledged. Each bolt in the +topology will anchor to each tuple it receives. If every bolt successfully processes and acks +each tuple in the chain, the original JMS Message will be acknowledged, and the underlying +JMS implementation will not attempt re-delivery of the message. If a bolt fails to process/ack +a tuple, the JMS message will not be acknowledged, and the JMS implementation will queue the +message for redelivery. + +### Data Flow +The topology contains two chains: One originating from a JMS Spout connected to a Queue, and +another originating from a JMS Spout connected to a Topic. + +**Chain #1** + +1. The "JMS Queue Spout" receives a JMS Message object from the queue, and emits a +tuple to the "Intermediate Bolt" +2. The "Intermediate Bolt" emits a tuple to the "Final Bolt" and the "JMS Topic Bolt", and acks +the tuple it recieved. +3. The "Final Bolt" receives the tuple and simply acks it, it does not emit anything. +4. The "JMS Topic Bolt" receives a tuple, constructs a JMS Message from the tuple's values, +and publishes the message to a JMS Topic. +5. If the "JMS Topic Bolt" successfully publishes the JMS message, it will ack the tuple. +6. The "JMS Queue Spout" will recieve notification if all bolts in the chain have acked +and acknowledge the original JMS Message. If one or more bolts in the chain fail to ack a tuple, the +"JMS Queue Spout" will not acknowledge the JMS message. + +**Chain #2** + +1. The "JMS Topic Spout" receives a JMS message from the topic and emits a tuple to "Another Bolt." +2. The "Another Bolt" receives and acks the tuple. +3. The "JMS Topic Spout"
[08/50] [abbrv] storm git commit: Added Clojars repository to pom for compile
Added Clojars repository to pom for compile Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2a02f9fd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2a02f9fd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2a02f9fd Branch: refs/heads/master Commit: 2a02f9fdace05317cc3c878a2b2f7a7fe4f060eb Parents: dd4e5e6 Author: andy_lock_farmAuthored: Tue Jun 18 09:27:35 2013 +0100 Committer: andy_lock_farm Committed: Tue Jun 18 09:27:35 2013 +0100 -- pom.xml | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/2a02f9fd/pom.xml -- diff --git a/pom.xml b/pom.xml index f4db48b..5702bc3 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,16 @@ 5.5.1 test + + + + + clojars.org + http://clojars.org/repo + + +
[26/50] [abbrv] storm git commit: move session.commit() call to State.commit() method
move session.commit() call to State.commit() method Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91dddb2b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91dddb2b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91dddb2b Branch: refs/heads/master Commit: 91dddb2b653cfcd79ee2b4bef5e784cdf28486cd Parents: a9e7339 Author: P. Taylor GoetzAuthored: Fri Aug 15 15:33:40 2014 -0400 Committer: P. Taylor Goetz Committed: Fri Aug 15 15:33:40 2014 -0400 -- .../storm/contrib/jms/trident/JmsState.java | 13 +++-- storm-jms.iml | 52 ++-- 2 files changed, 34 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/91dddb2b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java index 5f0bc58..671f0f0 100644 --- a/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java +++ b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java @@ -95,12 +95,18 @@ public class JmsState implements State { @Override public void beginCommit(Long aLong) { -LOG.debug("beginCommit is noop."); } @Override public void commit(Long aLong) { -LOG.debug("commit is noop."); +LOG.debug("Committing JMS transaction."); +if(this.options.jmsTransactional) { +try { +session.commit(); +} catch(JMSException e){ +LOG.error("JMS Session commit failed.", e); +} +} } public void updateState(List tuples, TridentCollector collector) throws JMSException { @@ -122,8 +128,5 @@ public class JmsState implements State { } throw new FailedException("Failed to write tuples", e); } -if(this.options.jmsTransactional) { -session.commit(); -} } } http://git-wip-us.apache.org/repos/asf/storm/blob/91dddb2b/storm-jms.iml -- diff --git a/storm-jms.iml b/storm-jms.iml index 4554773..ac63ba4 100644 --- a/storm-jms.iml +++ b/storm-jms.iml @@ -3,43 +3,27 @@ - - + - - - - - - - - - - - + + - - - - - - - - - + + + @@ -50,20 +34,36 @@ - - + + + + + + + + + + + + + + - + + + - + + +
[29/50] [abbrv] storm git commit: upgrade storm version
upgrade storm version Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ddc3a47b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ddc3a47b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ddc3a47b Branch: refs/heads/master Commit: ddc3a47b2f4b5f7c8c7b35efc2c62e48f1fca939 Parents: 3fc7f30 Author: P. Taylor GoetzAuthored: Sun Mar 8 01:37:55 2015 -0500 Committer: P. Taylor Goetz Committed: Sun Mar 8 01:37:55 2015 -0500 -- .gitignore | 1 + pom.xml | 2 +- .../storm/contrib/jms/trident/JmsState.java | 3 - storm-jms.iml | 85 4 files changed, 2 insertions(+), 89 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ddc3a47b/.gitignore -- diff --git a/.gitignore b/.gitignore index 7c73dad..faebb16 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.ipr *.iws +*.iml target/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ddc3a47b/pom.xml -- diff --git a/pom.xml b/pom.xml index 235fbdc..aa87b75 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ - 0.9.2-incubating + 0.9.3 http://git-wip-us.apache.org/repos/asf/storm/blob/ddc3a47b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java -- diff --git a/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java index 6b7fdfb..2a285b4 100644 --- a/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java +++ b/src/main/java/backtype/storm/contrib/jms/trident/JmsState.java @@ -18,10 +18,7 @@ package backtype.storm.contrib.jms.trident; import backtype.storm.contrib.jms.JmsProvider; -import backtype.storm.contrib.jms.trident.TridentJmsMessageProducer; import backtype.storm.topology.FailedException; -import backtype.storm.tuple.Values; -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.trident.operation.TridentCollector; http://git-wip-us.apache.org/repos/asf/storm/blob/ddc3a47b/storm-jms.iml -- diff --git a/storm-jms.iml b/storm-jms.iml deleted file mode 100644 index ac63ba4..000 --- a/storm-jms.iml +++ /dev/null @@ -1,85 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -