[1/2] nifi git commit: NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS
Repository: nifi Updated Branches: refs/heads/0.x 639e6d6a7 -> c2e98f96e NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS Remove unused assertEquals import Move destination from default to send/receive to support EL better Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58fdfdc7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58fdfdc7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58fdfdc7 Branch: refs/heads/0.x Commit: 58fdfdc76e2e59bffbbcdd4e96ad2a190b412b29 Parents: 639e6d6 Author: Joey Frazee Authored: Mon Sep 19 08:18:30 2016 -0500 Committer: Oleg Zhurakousky Committed: Tue Sep 20 10:11:27 2016 -0400 -- .../jms/processors/AbstractJMSProcessor.java| 2 +- .../apache/nifi/jms/processors/ConsumeJMS.java | 15 --- .../apache/nifi/jms/processors/JMSConsumer.java | 4 +- .../nifi/jms/processors/JMSPublisher.java | 10 ++--- .../apache/nifi/jms/processors/PublishJMS.java | 3 +- .../apache/nifi/jms/processors/CommonTest.java | 3 +- .../nifi/jms/processors/ConsumeJMSTest.java | 32 --- .../processors/JMSPublisherConsumerTest.java| 31 --- .../nifi/jms/processors/PublishJMSTest.java | 42 ++-- 9 files changed, 101 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java -- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index f8030db..ed45b84 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -198,7 +198,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); - jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue()); +this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java -- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index a4cad0d..e8e0eb9 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -76,7 +76,8 @@ public class ConsumeJMS extends AbstractJMSProcessor { */ @Override protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { -final JMSResponse response = this.targetResource.consume(); +final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); +final JMSResponse response = this.targetResource.consume(destinationName); if (response != null){ FlowFile flowFile = processSession.create(); flowFile = processSession.write(flowFile, new OutputStreamCallback() { @@ -86,8 +87,10 @@ public class ConsumeJMS extends AbstractJMSProcessor { } }); Map jmsHeaders = response.getMessageHeaders(); -flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); -processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).getValue()); +Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); +flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession); +flowFile = th
[1/2] nifi git commit: NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS
Repository: nifi Updated Branches: refs/heads/master feaa4c9db -> b693a4a56 NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS Remove unused assertEquals import Move destination from default to send/receive to support EL better Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2386760 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2386760 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2386760 Branch: refs/heads/master Commit: c23867605857f8380140ba4d69437b58585e9cba Parents: feaa4c9 Author: Joey Frazee Authored: Mon Sep 19 08:18:30 2016 -0500 Committer: Oleg Zhurakousky Committed: Tue Sep 20 09:30:14 2016 -0400 -- .../jms/processors/AbstractJMSProcessor.java| 1 - .../apache/nifi/jms/processors/ConsumeJMS.java | 13 +++--- .../apache/nifi/jms/processors/JMSConsumer.java | 4 +- .../nifi/jms/processors/JMSPublisher.java | 10 ++--- .../apache/nifi/jms/processors/PublishJMS.java | 3 +- .../apache/nifi/jms/processors/CommonTest.java | 3 +- .../nifi/jms/processors/ConsumeJMSTest.java | 21 +++--- .../processors/JMSPublisherConsumerTest.java| 30 +++--- .../nifi/jms/processors/PublishJMSTest.java | 42 ++-- 9 files changed, 89 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java -- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 20937b5..d7c40f7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -202,7 +202,6 @@ abstract class AbstractJMSProcessor extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); -jmsTemplate.setDefaultDestinationName(this.destinationName); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java -- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index 83e594a..cdd5fcd 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -78,7 +78,8 @@ public class ConsumeJMS extends AbstractJMSProcessor { */ @Override protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { -final JMSResponse response = this.targetResource.consume(); +final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); +final JMSResponse response = this.targetResource.consume(destinationName); if (response != null){ FlowFile flowFile = processSession.create(); flowFile = processSession.write(flowFile, new OutputStreamCallback() { @@ -88,7 +89,9 @@ public class ConsumeJMS extends AbstractJMSProcessor { } }); Map jmsHeaders = response.getMessageHeaders(); -flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); +Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); +flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession); +flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, flowFile, processSession); processSession.getProvenanceReporter().receive(flowFile,