[1/2] nifi git commit: NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS

2016-09-20 Thread ozhurakousky
Repository: nifi
Updated Branches:
  refs/heads/0.x 639e6d6a7 -> c2e98f96e


NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in 
ConsumeJMS

Remove unused assertEquals import

Move destination from default to send/receive to support EL better


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58fdfdc7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58fdfdc7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58fdfdc7

Branch: refs/heads/0.x
Commit: 58fdfdc76e2e59bffbbcdd4e96ad2a190b412b29
Parents: 639e6d6
Author: Joey Frazee 
Authored: Mon Sep 19 08:18:30 2016 -0500
Committer: Oleg Zhurakousky 
Committed: Tue Sep 20 10:11:27 2016 -0400

--
 .../jms/processors/AbstractJMSProcessor.java|  2 +-
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 15 ---
 .../apache/nifi/jms/processors/JMSConsumer.java |  4 +-
 .../nifi/jms/processors/JMSPublisher.java   | 10 ++---
 .../apache/nifi/jms/processors/PublishJMS.java  |  3 +-
 .../apache/nifi/jms/processors/CommonTest.java  |  3 +-
 .../nifi/jms/processors/ConsumeJMSTest.java | 32 ---
 .../processors/JMSPublisherConsumerTest.java| 31 ---
 .../nifi/jms/processors/PublishJMSTest.java | 42 ++--
 9 files changed, 101 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
--
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index f8030db..ed45b84 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -198,7 +198,7 @@ abstract class AbstractJMSProcessor 
extends AbstractProcess
 
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
-
jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue());
+this.destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
 
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
 // set of properties that may be good candidates for exposure via 
configuration

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
--
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index a4cad0d..e8e0eb9 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -76,7 +76,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor {
  */
 @Override
 protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
-final JMSResponse response = this.targetResource.consume();
+final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
+final JMSResponse response = 
this.targetResource.consume(destinationName);
 if (response != null){
 FlowFile flowFile = processSession.create();
 flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
@@ -86,8 +87,10 @@ public class ConsumeJMS extends 
AbstractJMSProcessor {
 }
 });
 Map jmsHeaders = response.getMessageHeaders();
-flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, 
flowFile, processSession);
-processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).getValue());
+Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties());
+flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, 
flowFile, processSession);
+flowFile = th

[1/2] nifi git commit: NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS

2016-09-20 Thread ozhurakousky
Repository: nifi
Updated Branches:
  refs/heads/master feaa4c9db -> b693a4a56


NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in 
ConsumeJMS

Remove unused assertEquals import

Move destination from default to send/receive to support EL better


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2386760
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2386760
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2386760

Branch: refs/heads/master
Commit: c23867605857f8380140ba4d69437b58585e9cba
Parents: feaa4c9
Author: Joey Frazee 
Authored: Mon Sep 19 08:18:30 2016 -0500
Committer: Oleg Zhurakousky 
Committed: Tue Sep 20 09:30:14 2016 -0400

--
 .../jms/processors/AbstractJMSProcessor.java|  1 -
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 13 +++---
 .../apache/nifi/jms/processors/JMSConsumer.java |  4 +-
 .../nifi/jms/processors/JMSPublisher.java   | 10 ++---
 .../apache/nifi/jms/processors/PublishJMS.java  |  3 +-
 .../apache/nifi/jms/processors/CommonTest.java  |  3 +-
 .../nifi/jms/processors/ConsumeJMSTest.java | 21 +++---
 .../processors/JMSPublisherConsumerTest.java| 30 +++---
 .../nifi/jms/processors/PublishJMSTest.java | 42 ++--
 9 files changed, 89 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
--
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 20937b5..d7c40f7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -202,7 +202,6 @@ abstract class AbstractJMSProcessor 
extends AbstractProcess
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
 this.destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
-jmsTemplate.setDefaultDestinationName(this.destinationName);
 
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
 // set of properties that may be good candidates for exposure via 
configuration

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
--
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 83e594a..cdd5fcd 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -78,7 +78,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor {
  */
 @Override
 protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
-final JMSResponse response = this.targetResource.consume();
+final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
+final JMSResponse response = 
this.targetResource.consume(destinationName);
 if (response != null){
 FlowFile flowFile = processSession.create();
 flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
@@ -88,7 +89,9 @@ public class ConsumeJMS extends 
AbstractJMSProcessor {
 }
 });
 Map jmsHeaders = response.getMessageHeaders();
-flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, 
flowFile, processSession);
+Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties());
+flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, 
flowFile, processSession);
+flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, 
flowFile, processSession);
 processSession.getProvenanceReporter().receive(flowFile,