Support,
Given the following:
Background:
JBoss ESB 4.12 (JMS messaging)
Apache Ignite 1.5 final
1. To begin, I am starting to 2 server nodes with same configuration that
comes with Ignite (ie:
..\apache-ignite-fabric-1.5.0.final-bin\\examples\\config\\example-ignite.xml)
2. Next I start JBoss ESB and deploy a Topic called
"/topic/quickstart_jmstopic_topic"
3. Next, I execute JMSStreamWords.
4. Finally, I send a message to "/topic/quickstart_jmstopic_topic"
5. No data is written to cache.
Problem:
I am attempting to use the JMS Data Streamer to inject data into Ignite
cache.
I am using a JMS Topic for a destination.
When I debug the JMSStreamWords class, I am seeing that the message is
received
and the 'answer.put(tokens[0], tokens[1]);' statement is completed without
exceptions/errors.
Any ideas on why data is not written to cache as expected...
I would appreciate any suggestions for resolving this issue...
Please advise.
Thanks in advance.
Source Code:
package netmille.examples.streaming.jms;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.stream.jms11.JmsStreamer;
import org.apache.ignite.stream.jms11.MessageTransformer;
import netmille.examples.ExamplesUtils;
public class JMSStreamWords {
private static Ignite ignite = null;
private static JmsStreamer<TextMessage, String, String> jmsStreamer =
new
JmsStreamer<>();
/**
* Starts JMS words streaming.
*
* @param args Command line arguments (none required).
* @throws Exception If failed.
*/
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try {
ignite =
Ignition.start("C:\\netmilleRoot\\tools\\apache-ignite-fabric-1.5.0.final-bin\\examples\\config\\example-ignite.xml");
if (!ExamplesUtils.hasServerNodes(ignite))
return;
IgniteCache<String, String> stmCache =
ignite.getOrCreateCache(CacheConfig.wordCache());
IgniteDataStreamer<String, String> dataStreamer =
ignite.dataStreamer("words");
dataStreamer.allowOverwrite(true);
Properties properties1 = new Properties();
properties1.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
properties1.put(Context.URL_PKG_PREFIXES,
"org.jboss.naming:org.jnp.interfaces");
properties1.put(Context.PROVIDER_URL, "jnp://127.0.0.1:1199");
InitialContext iniCtx = new InitialContext(properties1);
TopicConnectionFactory tcf = (TopicConnectionFactory)
iniCtx.lookup("ConnectionFactory");
// create a JMS streamer and plug the data streamer into it
jmsStreamer.setIgnite(ignite);
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(tcf);
Topic topic = (Topic)
iniCtx.lookup("/topic/quickstart_jmstopic_topic");
jmsStreamer.setDestination(topic);
jmsStreamer.setTransacted(true);
jmsStreamer.setTransformer(new MessageTransformer<TextMessage,
String, String>() {
@Override
public Map<String, String> apply(TextMessage message) {
final Map<String, String> answer = new HashMap<>();
String text;
try {
text = message.getText();
}
catch (JMSException e) {
System.out.println("Could not parse message." + e);
return Collections.emptyMap();
}
for (String s : text.split("\n")) {
String[] tokens = s.split(",");
answer.put(tokens[0], tokens[1]);
System.out.println("added " + tokens[0]);
}
return answer;
}
});
jmsStreamer.start();
}
catch(Exception e)
{
System.out.println(e);
}
}
}
package netmille.examples.streaming.jms;
import org.apache.ignite.configuration.CacheConfiguration;
public class CacheConfig {
/**
* Configure streaming cache.
*/
public static CacheConfiguration<String, String> wordCache() {
CacheConfiguration<String, String> cfg = new
CacheConfiguration<>("words");
// Index all words streamed into cache.
cfg.setIndexedTypes(String.class, String.class);
cfg.setStatisticsEnabled(true);
return cfg;
}
}
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/JMS-Data-Streamer-Not-writing-to-data-to-cache-when-message-received-by-Streamer-tp3590.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.