[GitHub] [nifi] jfrazee commented on a change in pull request #4352: NIFI-7563 Optimize the usage of JMS sessions and message producers

2020-06-29 Thread GitBox


jfrazee commented on a change in pull request #4352:
URL: https://github.com/apache/nifi/pull/4352#discussion_r447314728



##
File path: 
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ConnectionFactory}'s invocation handler to be used to create {@link 
Proxy} instances. This handler stores useful information to validate the proper 
resources handling of underlying
+ * connection factory.
+ */
+@ThreadSafe
+public final class ConnectionFactoryInvocationHandler implements 
InvocationHandler {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ConnectionFactoryInvocationHandler.class);
+
+private final ConnectionFactory connectionFactory;
+private final List handlers = new 
CopyOnWriteArrayList<>();
+private final AtomicInteger openedConnections = new AtomicInteger();
+
+public ConnectionFactoryInvocationHandler(ConnectionFactory 
connectionFactory) {
+this.connectionFactory = Objects.requireNonNull(connectionFactory);
+}
+
+@Override
+public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+final Object o = 
connectionFactory.getClass().getMethod(method.getName(), 
method.getParameterTypes()).invoke(connectionFactory, args);
+LOGGER.debug("Method {} called on {}", method.getName(), 
connectionFactory);

Review comment:
   Minor nit. I know this is only used for a test, but can we wrap this in 
`LOGGER.isDebugEnabled()`? Same for `ConnectionInvocationHandler`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] jfrazee commented on a change in pull request #4352: NIFI-7563 Optimize the usage of JMS sessions and message producers

2020-06-29 Thread GitBox


jfrazee commented on a change in pull request #4352:
URL: https://github.com/apache/nifi/pull/4352#discussion_r447314098



##
File path: 
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
##
@@ -385,6 +390,66 @@ protected TcpTransport createTcpTransport(WireFormat wf, 
SocketFactory socketFac
 }
 }
 
+/**
+ * 
+ * This test validates the optimal resources usage. To process one message 
is expected to create only one connection, one session and one message producer.
+ * 
+ * 
+ * See https://issues.apache.org/jira/browse/NIFI-7563 for 
details.
+ * 
+ * @throws Exception any error related to the broker.
+ */
+@Test(timeout = 1)
+public void validateNIFI7563() throws Exception {
+BrokerService broker = new BrokerService();
+try {
+broker.setPersistent(false);
+TransportConnector connector = 
broker.addConnector("tcp://127.0.0.1:0");
+int port = connector.getServer().getSocketAddress().getPort();
+broker.start();
+
+final ActiveMQConnectionFactory innerCf = new 
ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
+ConnectionFactoryInvocationHandler connectionFactoryProxy = new 
ConnectionFactoryInvocationHandler(innerCf);
+
+// Create a connection Factory proxy to catch metrics and usage.
+ConnectionFactory cf = (ConnectionFactory) 
Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { 
ConnectionFactory.class }, connectionFactoryProxy);
+
+TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
+JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+when(cs.getIdentifier()).thenReturn("cfProvider");
+when(cs.getConnectionFactory()).thenReturn(cf);
+runner.addControllerService("cfProvider", cs);
+runner.enableControllerService(cs);
+
+runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+
+String destinationName = "myDestinationName";
+// The destination option according current implementation should 
contain topic or queue to infer the destination type
+// from the name. Check 
https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name 
can be
+// randomly created.
+String topicNameInHeader = "topic-foo";
+runner.setProperty(PublishJMS.DESTINATION, destinationName);
+runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
+
+Map flowFileAttributes = new HashMap<>();
+// This method will be removed once 
https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
+flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
+flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
+runner.enqueue("hi".getBytes(), flowFileAttributes);
+runner.enqueue("h2".getBytes(), flowFileAttributes);
+runner.setThreadCount(1);

Review comment:
   Could we add a test that tests the scenario with more threads? I believe 
your comment in the JIRA is accurate but I think the threading is the only real 
concern in this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org