Re: How to achieve writethrough with ignite data streamer
Hm.. Not sore what happened exactly in your case, but cache store is never deployed via peer class loading. It's required that you have a class explicitly deployed on every node prior to start up. -Val -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to achieve writethrough with ignite data streamer
Here the issue was with peerClassLoading flag. I had not enabled it on the server. The cache store factory class was not getting loaded as per the server logs. I enabled it and now write-through is achieved. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to achieve writethrough with ignite data streamer
if skipStore flag is enabled, then data won't be propagated to the store are you sure that you have writeThrough flag enabled? Additionally, you need to check your CacheStore implementation, if you don't have any ideas, you can post it here, the community will check it. Evgenii -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to achieve writethrough with ignite data streamer
I have set the overwrite flag to true. stmr.allowOverwrite(true); What is the significance of skipStore flag ? What is the flow for an entry from to setTupleMultipleExtractore to reach the cache ? I am thinking it should go through write method with which it gets put to the cache. I have overloaded the write method in the cachefactory as below: myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStoreAdapter.class)); -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to achieve writethrough with ignite data streamer
Hi, You need to set IgniteDataStreamer.allowOverwrite(true), as javadoc says: Note that when this flag is {@code false}, updates will not be propagated to the cache store * (i.e. {@link #skipStore()} flag will be set to {@code true} implicitly). Evgenii 2018-03-12 11:34 GMT+03:00 vbm: > Hi, > > I am using ignite datastreamer to pull the data from a kafka topic. > THe data is getting loaded to the ignite cache, but it is not getting > written to the 3rd party persistance (mysql db). > > I have set the cacheStoreFactory to my CustomClass which has extended > CacheStoreAdapter class. > > Code Snippet: > > > myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf( > MyCacheStoreAdapter.class)); > // Set as write-thorugh cache > myCacheCfg.setWriteThrough(true); > >try (IgniteDataStreamer stmr = > ignite.dataStreamer("myCache")) { > // allow overwriting cache data > stmr.allowOverwrite(true); > > kafkaStreamer = new KafkaStreamer<>(); > kafkaStreamer.setIgnite(ignite); > > kafkaStreamer.setStreamer(stmr); > > // set the topic > kafkaStreamer.setTopic(topic); > > // set the number of threads to process Kafka streams > kafkaStreamer.setThreads(1); > > Properties settings = new Properties(); > > kafkaStreamer.setConsumerConfig(new ConsumerConfig(settings)); > kafkaStreamer.setMultipleTupleExtractor( > new > StreamMultipleTupleExtractor , Long, > MyOrders>() { > @Override public Map > extract(MessageAndMetadata msg) { > Map entries = new HashMap<>(); > > try { > // Converting the message recieved to my > requirement > and adding it to the map > entries.put(key, order); > > } > catch (Exception ex) { > ex.printStackTrace(); > } > > return entries; > } > }); > > With this code, I am able to get the data in to the cache. But the write > through behaviour is not getting triggered. The code in my > cacheStroreFactory (write, load) is not getting called. > > > Can anyone help me with this and let me know how to achieve the write > through behaviour. > > > Regards, > Vishwas > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
How to achieve writethrough with ignite data streamer
Hi, I am using ignite datastreamer to pull the data from a kafka topic. THe data is getting loaded to the ignite cache, but it is not getting written to the 3rd party persistance (mysql db). I have set the cacheStoreFactory to my CustomClass which has extended CacheStoreAdapter class. Code Snippet: myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStoreAdapter.class)); // Set as write-thorugh cache myCacheCfg.setWriteThrough(true); try (IgniteDataStreamerstmr = ignite.dataStreamer("myCache")) { // allow overwriting cache data stmr.allowOverwrite(true); kafkaStreamer = new KafkaStreamer<>(); kafkaStreamer.setIgnite(ignite); kafkaStreamer.setStreamer(stmr); // set the topic kafkaStreamer.setTopic(topic); // set the number of threads to process Kafka streams kafkaStreamer.setThreads(1); Properties settings = new Properties(); kafkaStreamer.setConsumerConfig(new ConsumerConfig(settings)); kafkaStreamer.setMultipleTupleExtractor( new StreamMultipleTupleExtractor , Long, MyOrders>() { @Override public Map extract(MessageAndMetadata msg) { Map entries = new HashMap<>(); try { // Converting the message recieved to my requirement and adding it to the map entries.put(key, order); } catch (Exception ex) { ex.printStackTrace(); } return entries; } }); With this code, I am able to get the data in to the cache. But the write through behaviour is not getting triggered. The code in my cacheStroreFactory (write, load) is not getting called. Can anyone help me with this and let me know how to achieve the write through behaviour. Regards, Vishwas -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/