Hi,
I am using ignite datastreamer to pull the data from a kafka topic.
THe data is getting loaded to the ignite cache, but it is not getting
written to the 3rd party persistance (mysql db).
I have set the cacheStoreFactory to my CustomClass which has extended
CacheStoreAdapter class.
Code Snippet:
myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStoreAdapter.class));
// Set as write-thorugh cache
myCacheCfg.setWriteThrough(true);
try (IgniteDataStreamer<Long, MyOrders> stmr =
ignite.dataStreamer("myCache")) {
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer = new KafkaStreamer<>();
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic(topic);
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
Properties settings = new Properties();
kafkaStreamer.setConsumerConfig(new ConsumerConfig(settings));
kafkaStreamer.setMultipleTupleExtractor(
new
StreamMultipleTupleExtractor<MessageAndMetadata<byte[], byte[]>, Long,
MyOrders>() {
@Override public Map<Long, MyOrders>
extract(MessageAndMetadata<byte[], byte[]> msg) {
Map<Long, MyOrders> entries = new HashMap<>();
try {
// Converting the message recieved to my requirement
and adding it to the map
entries.put(key, order);
}
catch (Exception ex) {
ex.printStackTrace();
}
return entries;
}
});
With this code, I am able to get the data in to the cache. But the write
through behaviour is not getting triggered. The code in my
cacheStroreFactory (write, load) is not getting called.
Can anyone help me with this and let me know how to achieve the write
through behaviour.
Regards,
Vishwas
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/