Hi Jude (and all),

After your suggestion I started working on it. I wrote my JMS Provider,
that instantiates the ConnectionFactory & Destination. I am following the
storm-jms-example, and I wish to read from a IBM MQ queue, do some
processing, then push to some other queue. I have IBM MQ 8.0.05 installed
on my Windows 10 machine. Here are the snippets

*IBMMQJMSProvider.java*

public class IBMMQJMSProvider implements JmsProvider {
private static final long serialVersionUID = 1L;
private ConnectionFactory connectionFactory;
private Destination destination;

@Override
public ConnectionFactory connectionFactory() throws Exception {
return connectionFactory;
}

@Override
public Destination destination() throws Exception {
return destination;
}
@SuppressWarnings("unchecked")
public IBMMQJMSProvider(String topicName) throws JMSException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
Class<MQQueueConnectionFactory> clazz =
(Class<MQQueueConnectionFactory>)Class.forName("com.ibm.mq.jms.MQQueueConnectionFactory");
MQQueueConnectionFactory mqConnectionFactory = clazz.newInstance();

mqConnectionFactory.setHostName("localhost");
mqConnectionFactory.setPort(1414);
mqConnectionFactory.setChannel("channel_svr_conn");
mqConnectionFactory.setQueueManager("qm_test");
mqConnectionFactory.setTransportType(1);
mqConnectionFactory.setAppName("MqInMqOut" + topicName);
// Not secured on local - Not secured
//mqConnectionFactory.setSSLCipherSuite("");
connectionFactory = mqConnectionFactory;
JmsFactoryFactory jmsFact =
JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsQueue jmsQueue = jmsFact.createQueue(topicName);
destination = jmsQueue;
}

}

And here is my topology builder

        // JMS Queue Provider
        JmsProvider jmsInTopicProvider = new IBMMQJMSProvider("q_test_in");

        // JMS Producer
        JmsTupleProducer producer = new JsonTupleProducer();

        // JMS Queue Spout
        JmsSpout queueSpout = new JmsSpout();
        queueSpout.setJmsProvider(jmsInTopicProvider);
        queueSpout.setJmsTupleProducer(producer);
        queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        queueSpout.setDistributed(true); // allow multiple instances

        // JMS Out Topic provider
        JmsProvider jmsOutTopicProvider = new
IBMMQJMSProvider("q_test_out");
        JmsBolt jmsOutBolt = new JmsBolt();
        jmsOutBolt.setJmsProvider(jmsOutTopicProvider);

        // anonymous message producer just calls toString() on the tuple to
create a jms message
        jmsOutBolt.setJmsMessageProducer(new JmsMessageProducer() {
            @Override
            public Message toMessage(Session session, ITuple input) throws
JMSException {
                System.out.println("Sending JMS Message:" +
input.toString());
                TextMessage tm =
session.createTextMessage(input.toString());
                return tm;
            }
        });

        // Build JMS Topology
        TopologyBuilder builder = new TopologyBuilder();
        // spout with 5 parallel instances
        builder.setSpout(JMS_QUEUE_IN_SPOUT, queueSpout, 5);
        // intermediate bolt, subscribes to jms spout, anchors on tuples,
and auto-acks
        builder.setBolt(INTERMEDIATE_BOLT, new
GenericBolt(INTERMEDIATE_BOLT, true, true, new Fields("json")),
3).shuffleGrouping(JMS_QUEUE_IN_SPOUT);
        // Push to another topic
        builder.setBolt(JMS_TOPIC_OUT_BOLT,
jmsOutBolt).shuffleGrouping(INTERMEDIATE_BOLT);

But when I run this topology, I face following issues

   1. On startup, i see error as following while connecting to IBM MQ
   server, it seems to be related to some authorization, but I am not able to
   find any log file in IBM MQ folder to analyze it further.
   com.ibm.msg.client.jms.DetailedJMSSecurityException: JMSWMQ2013: The
   security authentication was not valid that was supplied for QueueManager
   'qm_test' with connection mode 'Client' and host name 'localhost(1414)'.
   ...
   Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed
   with compcode '2' ('MQCC_FAILED') reason '2035' ('MQRC_NOT_AUTHORIZED').
   ....
   2. If I push any message in source queue, the Bolt does not processes
   it, though logs roll after some time, but the messages that I push is not
   shown in logs.


It seems to be some connection issue with IBM MQ server, any help on
establishing the connection and processing the pushed messages will really
be helpful. Please let me know if something is missing in my implementation
for achieving this.

Thanks,
-Keshav


On Tue, Sep 18, 2018 at 7:59 PM Jude Huang Zhipeng <[email protected]>
wrote:

> Hi Keshav,
>
> I integrated Apache Storm with IBM MQ before, the solution was to get IBM
> MQ client library  (e.g. mqjms.jar) from MQ server or your company repo,
> and then write customized IBM MQ Spout and final Bolt. Hope this helps!
>
> regards,
> Jude
>
> 1904labs@St. Louis
>
> On Tue, Sep 18, 2018 at 9:14 AM Keshav Savant <[email protected]>
> wrote:
>
>> Hi,
>>
>> I am evaluating apache storm (1.2.2) for fitting it into our business
>> model.
>>
>> I have a requirement of reading from IBM MQ queue, then do some business
>> & transformation on read items and finally push again to a different IBM MQ
>> queue.
>>
>> I was able to do that with kafka as spout & bolt, but I a not able to
>> find any solution for my above use case with IBM MQ.
>>
>> I found on few forums that it was not supported earlier, but not sure
>> about its current status.
>>
>> Can anyone having some info on it, some link/pointers will be helpful in
>> doing this (maybe with some customization).
>>
>> Thanks,
>> Keshav
>>
>>

-- 
Keshav
Mobile : +91-98786-23168

Reply via email to