If we attach a receiver to the data streamer then we must call the put method
to get value into the cache.
But I am unable to retrieve the new value of the cache entry inside
StreamTransformer.from. When I cal e.getValue() in the following snippet, it
returns the previously stored cached value for the current key. How can I
get the value being currently added ?
CacheConfiguration<Integer, Integer> cacheConfiguration = new
CacheConfiguration("SAMPLE");
IgniteCache<Integer, Integer> igniteCache =
ignite.getOrCreateCache(cacheConfiguration);
try (IgniteDataStreamer<Integer, Integer> dataStreamer =
ignite.dataStreamer(igniteCache.getName())) {
dataStreamer.allowOverwrite(true);
dataStreamer.receiver(StreamTransformer.from((e, arg) -> {
System.out.println("Key:" + e.getKey());
System.out.println("Val:" + e.getValue()); //This always returns
the previously stored value
if(e.getValue() == null){
igniteCache.put(e.getKey(), 0);
}
return null;
}));
dataStreamer.addData(1, 1);
dataStreamer.addData(2, 1);
dataStreamer.addData(1, 2);
Although I am able to solve my problem using the StreamVisitor instead of
the StreamnTransformer but I had to struggle with StreamTransformer
symantics for a significant amount of time. I am still not sure if I am
thinking about Stream Transformer correctly or not.
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/StreamTransformer-usage-issue-How-to-get-value-of-inbound-entry-tp1690.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.