Hi Alexander, Please make sure that you flush data streamer before checking the "sum" value:
fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L)); streamer.flush(); Vladimir. On Mon, May 23, 2016 at 10:35 AM, Александр Савинов <rain-sa...@mail.ru> wrote: > > Hello. > I have a problem with stream API and Ignite. The value of "sum" variable > should be 1000000 (that equals to length of file test.csv), but it equals > 999424. If file length is small (10 or even 1000) there is nothing in cache. > Thank you. > Если вы знаете русский, ответьте, пожалуйста, на русском. > <//e.mail.ru/compose/?mailto=mailto%3au...@ignite.apache.org> > > IgniteConfiguration igniteConfiguration = new IgniteConfiguration(); > igniteConfiguration.setPeerClassLoadingEnabled(true); > Ignite ignite = Ignition.start(igniteConfiguration); > CacheConfiguration<Integer, Long> cacheConfiguration = new > CacheConfiguration<>("cache"); > IgniteCache<Integer, Long> cache = > ignite.getOrCreateCache(cacheConfiguration); > IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache"); > streamer.receiver(StreamTransformer.from((entry, arg)->{ > Long value = entry.getValue(); > entry.setValue(value==null ? 1L : value + 1L); > return entry; > })); > Stream<String> fileStream = Files.lines(Paths.get("test.csv")); > fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L)); > cache.forEach((entry)->System.out.println(entry.getKey() + ": " + > entry.getValue())); > int s = 0; > Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator(); > while(iterator.hasNext()){ > Cache.Entry<Integer, Long> entry = iterator.next(); > s+=entry.getValue(); > } > System.out.println(s); > cache.clear(); > ignite.close(); > > > > -- > С уважением, Александр. > > ------------------------------ > > -- > С уважением, Александр. >