Support,

Given the following:

Background:
JBoss ESB 4.12        (JMS messaging)
Apache Ignite 1.5 final

1. To begin, I am starting to 2 server nodes with same configuration that
comes with Ignite (ie:
..\apache-ignite-fabric-1.5.0.final-bin\\examples\\config\\example-ignite.xml)
2. Next I start JBoss ESB and deploy a Topic called
"/topic/quickstart_jmstopic_topic"
3. Next, I execute JMSStreamWords.
4. Finally, I send a message to "/topic/quickstart_jmstopic_topic"
5. No data is written to cache.

Problem:

I am attempting to use the JMS Data Streamer to inject data into Ignite
cache.
I am using a JMS Topic for a destination.  

When I debug the JMSStreamWords class, I am seeing that the message is
received
and the 'answer.put(tokens[0], tokens[1]);' statement is completed without
exceptions/errors.

Any ideas on why data is not written to cache as expected...
I would appreciate any suggestions for resolving this issue...

Please advise.

Thanks in advance.
Source Code:

package netmille.examples.streaming.jms;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.stream.jms11.JmsStreamer;
import org.apache.ignite.stream.jms11.MessageTransformer;

import netmille.examples.ExamplesUtils;


public class JMSStreamWords {
        
        private static Ignite ignite = null;
        private static JmsStreamer<TextMessage, String, String> jmsStreamer = 
new
JmsStreamer<>();
         
    /**
     * Starts JMS words streaming.
     *
     * @param args Command line arguments (none required).
     * @throws Exception If failed.
     */
    public static void main(String[] args) throws Exception {
        // Mark this cluster member as client.
        Ignition.setClientMode(true);
  
        try {
                        
            ignite =
Ignition.start("C:\\netmilleRoot\\tools\\apache-ignite-fabric-1.5.0.final-bin\\examples\\config\\example-ignite.xml");
            if (!ExamplesUtils.hasServerNodes(ignite))
                return;

            IgniteCache<String, String> stmCache =
ignite.getOrCreateCache(CacheConfig.wordCache());

            IgniteDataStreamer<String, String> dataStreamer =
ignite.dataStreamer("words");
            dataStreamer.allowOverwrite(true);

            Properties properties1 = new Properties();
                properties1.put(Context.INITIAL_CONTEXT_FACTORY,
                                "org.jnp.interfaces.NamingContextFactory");
                properties1.put(Context.URL_PKG_PREFIXES,
                                "org.jboss.naming:org.jnp.interfaces");
                properties1.put(Context.PROVIDER_URL, "jnp://127.0.0.1:1199");
                InitialContext iniCtx = new InitialContext(properties1);

                TopicConnectionFactory tcf = (TopicConnectionFactory)
iniCtx.lookup("ConnectionFactory");
                  
         // create a JMS streamer and plug the data streamer into it
                
            jmsStreamer.setIgnite(ignite);
            jmsStreamer.setStreamer(dataStreamer);
            jmsStreamer.setConnectionFactory(tcf);
            Topic topic = (Topic)
iniCtx.lookup("/topic/quickstart_jmstopic_topic");
            
            jmsStreamer.setDestination(topic);
            jmsStreamer.setTransacted(true);
            jmsStreamer.setTransformer(new MessageTransformer<TextMessage,
String, String>() {
                @Override
                public Map<String, String> apply(TextMessage message) {
                    final Map<String, String> answer = new HashMap<>();
                    String text;
                    try {
                        text = message.getText();
                    }
                    catch (JMSException e) {
                        System.out.println("Could not parse message." + e);
                        return Collections.emptyMap();
                    }
                    for (String s : text.split("\n")) {
                        String[] tokens = s.split(",");
                        answer.put(tokens[0], tokens[1]);
                        System.out.println("added " + tokens[0]);
                    }
                    return answer;
                }
            });
            jmsStreamer.start();
        }
        catch(Exception e)
        {
         System.out.println(e);
        }
    }
}




package netmille.examples.streaming.jms;


import org.apache.ignite.configuration.CacheConfiguration;

public class CacheConfig {
    /**
     * Configure streaming cache.
     */
    public static CacheConfiguration<String, String> wordCache() {
        CacheConfiguration<String, String> cfg = new
CacheConfiguration<>("words");

        // Index all words streamed into cache.
        cfg.setIndexedTypes(String.class, String.class);
        cfg.setStatisticsEnabled(true);

        return cfg;
    }
}




--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/JMS-Data-Streamer-Not-writing-to-data-to-cache-when-message-received-by-Streamer-tp3590.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to