Hi Alexander,

Please make sure that you flush data streamer before checking the "sum"
value:

fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
streamer.flush();

Vladimir.


On Mon, May 23, 2016 at 10:35 AM, Александр Савинов <rain-sa...@mail.ru>
wrote:

>
> Hello.
> I have a problem with stream API and Ignite. The value of "sum" variable
> should be 1000000 (that equals to length of file test.csv), but it equals
> 999424. If file length is small (10 or even 1000) there is nothing in cache.
> Thank you.
> Если вы знаете русский, ответьте, пожалуйста, на русском.
> <//e.mail.ru/compose/?mailto=mailto%3au...@ignite.apache.org>
>
> IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
> igniteConfiguration.setPeerClassLoadingEnabled(true);
> Ignite ignite = Ignition.start(igniteConfiguration);
> CacheConfiguration<Integer, Long> cacheConfiguration = new 
> CacheConfiguration<>("cache");
> IgniteCache<Integer, Long> cache = 
> ignite.getOrCreateCache(cacheConfiguration);
> IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
> streamer.receiver(StreamTransformer.from((entry, arg)->{
>     Long value = entry.getValue();
>     entry.setValue(value==null ? 1L : value + 1L);
>     return entry;
> }));
> Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
> fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
> cache.forEach((entry)->System.out.println(entry.getKey() + ": " + 
> entry.getValue()));
> int s = 0;
> Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
> while(iterator.hasNext()){
>     Cache.Entry<Integer, Long> entry = iterator.next();
>     s+=entry.getValue();
> }
> System.out.println(s);
> cache.clear();
> ignite.close();
>
>
>
> --
> С уважением, Александр.
>
> ------------------------------
>
> --
> С уважением, Александр.
>

Reply via email to