JIRA ticket created: https://issues.apache.org/jira/browse/NIFI-5360
From: Kelsey RIDER <[email protected]> Sent: vendredi 29 juin 2018 09:12 To: [email protected] Subject: RE: simple JMS pub-sub not working Hi Mark, I see what you’re saying, but conceptually, it doesn’t seem like a good idea to me. From what I can gather, the purpose of Controller Services is to have a shared component that deals with external special cases (like DB connections). From my point of view (which is admittedly that of a newb) it seems like: 1. The JMSConnectionFactoryProvider should handle the special case of dealing with a specific external system. It should expose a method that allows the Processor to request a FlowFile fetched from a queue. This would be analagous to a DB – if I want to fetch rows from, say, a PostgreSQL DB, the Controller Service handles all the Postgres-specific stuff (including the Driver library), and the Processor just gets a FlowFile. (Edit – looked at the code and realized it doesn’t work like this. But then why don’t we have classpath issues when fetching from a DB?) 2. Adding the MQ libraries to the Processor would be redundant (DRY), because they’re already specified on the JMSConnectionFactoryProvider. I can certainly open a JIRA ticket – what do I need to sign up for an account? And if I get some time I wouldn’t mind trying to implement something like what I just described. Kelsey From: Mark Payne <[email protected]<mailto:[email protected]>> Sent: jeudi 28 juin 2018 17:20 To: [email protected]<mailto:[email protected]> Subject: Re: simple JMS pub-sub not working Thanks Kelsey. So it appears that when using RabbitMQ, it's attempting to access the RabbitMQ classes in a call from the ConsumeJMS processor, not from within the ConnectionFactory. I think we may end up needing to include a new property in ConsumeJMS / PublishJMS that optionally allow you to specify the libraries to use for those processors, as well. I.e., in addition to needing the libraries in the Connection Factory Controller Service, for some vendors we will need to also provide the libraries to the Processor. Fortunately, it should be an easy enough change to make, if that's what is necessary. We would basically just need to create a Property Descriptor in the processor that is similar to the one in the Controller Service: public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder() .name(CF_LIB) .displayName("MQ Client Libraries path (i.e., /usr/jms/lib)") .description("Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added " + "to the classpath. Such resources typically represent target MQ client libraries for the " + "ConnectionFactory implementation.") .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator())) .required(true) .dynamicallyModifiesClasspath(true) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); And then return that in the processor's list of properties. The Processors itself doesn't actually need to do anything with that property, because the framework will detect the ".dynamicallyModifiesClasspath(true)" part and handle it for us. If you're the type who is inclined to create a JIRA and PR, I'd be happy to review it! Or if you are able to even just test it out and validate that this solves the issue, then that would be helpful as well. Thanks -Mark On Jun 28, 2018, at 11:02 AM, Kelsey RIDER <[email protected]<mailto:[email protected]>> wrote: Hello Mark, Here you go: 2018-06-28 15:47:27,161 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.jms.processors.ConsumeJMS ConsumeJMS[id=01641047-1441-10a4-ce22-23d392689dd9] ConsumeJMS[id=01641047-1441-10a4-ce22-23d392689dd9] failed to process session due to org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is com.rabbitmq.jms.util.RMQJMSException: com.rabbitmq.jms.client.message.RMQTextMessage; Processor Administratively Yielded for 1 sec: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is com.rabbitmq.jms.util.RMQJMSException: com.rabbitmq.jms.client.message.RMQTextMessage org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is com.rabbitmq.jms.util.RMQJMSException: com.rabbitmq.jms.client.message.RMQTextMessage at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497) at org.apache.nifi.jms.processors.JMSConsumer.consume(JMSConsumer.java:85) at org.apache.nifi.jms.processors.ConsumeJMS.rendezvousWithJms(ConsumeJMS.java:181) at org.apache.nifi.jms.processors.ConsumeJMS.rendezvousWithJms(ConsumeJMS.java:59) at org.apache.nifi.jms.processors.AbstractJMSProcessor.onTrigger(AbstractJMSProcessor.java:157) at org.apache.nifi.jms.processors.ConsumeJMS.onTrigger(ConsumeJMS.java:59) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1147) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:175) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.rabbitmq.jms.util.RMQJMSException: com.rabbitmq.jms.client.message.RMQTextMessage at com.rabbitmq.jms.client.RMQMessage.instantiateRmqMessage(RMQMessage.java:1070) at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1030) at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:871) at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:865) at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:331) at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:246) at org.apache.nifi.jms.processors.JMSConsumer$1.doInJms(JMSConsumer.java:95) at org.apache.nifi.jms.processors.JMSConsumer$1.doInJms(JMSConsumer.java:85) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494) ... 16 common frames omitted Caused by: java.lang.ClassNotFoundException: com.rabbitmq.jms.client.message.RMQTextMessage at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.rabbitmq.jms.client.RMQMessage.instantiateRmqMessage(RMQMessage.java:1064) ... 24 common frames omitted From: Mark Payne <[email protected]<mailto:[email protected]>> Sent: jeudi 28 juin 2018 16:40 To: [email protected]<mailto:[email protected]> Subject: Re: simple JMS pub-sub not working Hi Kelsey, Can you provide the entire stack trace? What you've given here is just a "Caused by" portion of it. Thanks -Mark On Jun 28, 2018, at 9:50 AM, Kelsey RIDER <[email protected]<mailto:[email protected]>> wrote: Hello again, I’m trying to set up a very simple round-trip JMS test. I have a RabbitMQ server set up and installed locally. I set up a simple NiFi flow that generates a FlowFile and uses PublishJMS to send it to the queue. I then have a ConsumeJMS processor that tries to consume what I just published. The problem is that the consumer always dies with the following exception: Caused by: java.lang.ClassNotFoundException: com.rabbitmq.jms.client.message.RMQTextMessage at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.rabbitmq.jms.client.RMQMessage.instantiateRmqMessage(RMQMessage.java:1064) Both the publisher and the consumer use the same JMSConnectionFactoryProvider, which is configured with the appropriate libraries and is successfully communicating with RabbitMQ (I can see the published messages from the RabbitMQ admin console). Why doesn’t ConsumeJMS use the libraries configured on the JMS controller service? How could I get this to work? I tried dropping the required JARs in lib/ but this led to the JMS controller service not starting (don’t recall the exception). Dropping them in lib/bootstrap/ didn’t have any effect. Thanks, Kelsey Suite à l’évolution des dispositifs de réglementation du travail, si vous recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y répondre immédiatement.
