Hi,

You need to set IgniteDataStreamer.allowOverwrite(true), as javadoc
says: Note that when this flag is {@code false}, updates will not be
propagated to the cache store

* (i.e. {@link #skipStore()} flag will be set to {@code true} implicitly).


Evgenii


2018-03-12 11:34 GMT+03:00 vbm <bmvish...@gmail.com>:

> Hi,
>
> I am using ignite datastreamer to pull the data from a kafka topic.
> THe data is getting loaded to the ignite cache, but it is not getting
> written to the 3rd party persistance (mysql db).
>
> I have set the cacheStoreFactory to my CustomClass which has extended
> CacheStoreAdapter class.
>
> Code Snippet:
>
>
> myCacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(
> MyCacheStoreAdapter.class));
>     // Set as write-thorugh cache
>     myCacheCfg.setWriteThrough(true);
>
>    try (IgniteDataStreamer<Long, MyOrders> stmr =
> ignite.dataStreamer("myCache")) {
>             // allow overwriting cache data
>             stmr.allowOverwrite(true);
>
>             kafkaStreamer = new KafkaStreamer<>();
>             kafkaStreamer.setIgnite(ignite);
>
>             kafkaStreamer.setStreamer(stmr);
>
>             // set the topic
>             kafkaStreamer.setTopic(topic);
>
>             // set the number of threads to process Kafka streams
>             kafkaStreamer.setThreads(1);
>
>             Properties settings = new Properties();
>
>             kafkaStreamer.setConsumerConfig(new ConsumerConfig(settings));
>             kafkaStreamer.setMultipleTupleExtractor(
>                 new
> StreamMultipleTupleExtractor<MessageAndMetadata&lt;byte[], byte[]>, Long,
> MyOrders>() {
>                 @Override public Map<Long, MyOrders>
> extract(MessageAndMetadata<byte[], byte[]> msg) {
>                     Map<Long, MyOrders> entries = new HashMap<>();
>
>                     try {
>                         // Converting the message recieved to my
> requirement
> and adding it to the map
>                         entries.put(key, order);
>
>                     }
>                     catch (Exception ex) {
>                         ex.printStackTrace();
>                     }
>
>                     return entries;
>                 }
> });
>
> With this code, I am able to get the data in to the cache. But the write
> through behaviour is not getting triggered. The code in my
> cacheStroreFactory (write, load) is not getting called.
>
>
> Can anyone help me with this and let me know how to achieve the write
> through behaviour.
>
>
> Regards,
> Vishwas
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Reply via email to