[GitHub] [nifi] jfrazee commented on a change in pull request #4352: NIFI-7563 Optimize the usage of JMS sessions and message producers
jfrazee commented on a change in pull request #4352: URL: https://github.com/apache/nifi/pull/4352#discussion_r447314728 ## File path: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.helpers; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.concurrent.ThreadSafe; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ConnectionFactory}'s invocation handler to be used to create {@link Proxy} instances. This handler stores useful information to validate the proper resources handling of underlying + * connection factory. + */ +@ThreadSafe +public final class ConnectionFactoryInvocationHandler implements InvocationHandler { + +private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactoryInvocationHandler.class); + +private final ConnectionFactory connectionFactory; +private final List handlers = new CopyOnWriteArrayList<>(); +private final AtomicInteger openedConnections = new AtomicInteger(); + +public ConnectionFactoryInvocationHandler(ConnectionFactory connectionFactory) { +this.connectionFactory = Objects.requireNonNull(connectionFactory); +} + +@Override +public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { +final Object o = connectionFactory.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(connectionFactory, args); +LOGGER.debug("Method {} called on {}", method.getName(), connectionFactory); Review comment: Minor nit. I know this is only used for a test, but can we wrap this in `LOGGER.isDebugEnabled()`? Same for `ConnectionInvocationHandler`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] jfrazee commented on a change in pull request #4352: NIFI-7563 Optimize the usage of JMS sessions and message producers
jfrazee commented on a change in pull request #4352: URL: https://github.com/apache/nifi/pull/4352#discussion_r447314098 ## File path: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java ## @@ -385,6 +390,66 @@ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFac } } +/** + * + * This test validates the optimal resources usage. To process one message is expected to create only one connection, one session and one message producer. + * + * + * See https://issues.apache.org/jira/browse/NIFI-7563 for details. + * + * @throws Exception any error related to the broker. + */ +@Test(timeout = 1) +public void validateNIFI7563() throws Exception { +BrokerService broker = new BrokerService(); +try { +broker.setPersistent(false); +TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0"); +int port = connector.getServer().getSocketAddress().getPort(); +broker.start(); + +final ActiveMQConnectionFactory innerCf = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + port); +ConnectionFactoryInvocationHandler connectionFactoryProxy = new ConnectionFactoryInvocationHandler(innerCf); + +// Create a connection Factory proxy to catch metrics and usage. +ConnectionFactory cf = (ConnectionFactory) Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { ConnectionFactory.class }, connectionFactoryProxy); + +TestRunner runner = TestRunners.newTestRunner(new PublishJMS()); +JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); +when(cs.getIdentifier()).thenReturn("cfProvider"); +when(cs.getConnectionFactory()).thenReturn(cf); +runner.addControllerService("cfProvider", cs); +runner.enableControllerService(cs); + +runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + +String destinationName = "myDestinationName"; +// The destination option according current implementation should contain topic or queue to infer the destination type +// from the name. Check https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name can be +// randomly created. +String topicNameInHeader = "topic-foo"; +runner.setProperty(PublishJMS.DESTINATION, destinationName); +runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE); + +Map flowFileAttributes = new HashMap<>(); +// This method will be removed once https://issues.apache.org/jira/browse/NIFI-7564 is fixed. +flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader); +flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader); +runner.enqueue("hi".getBytes(), flowFileAttributes); +runner.enqueue("h2".getBytes(), flowFileAttributes); +runner.setThreadCount(1); Review comment: Could we add a test that tests the scenario with more threads? I believe your comment in the JIRA is accurate but I think the threading is the only real concern in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org