Thanks you! It works correctly.
>Понедельник, 23 мая 2016, 11:11 +03:00 от Vladimir Ozerov ><[email protected]>: > >Hi Alexander, > >Please make sure that you flush data streamer before checking the "sum" value: >fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L)); >streamer.flush(); >Vladimir. > >On Mon, May 23, 2016 at 10:35 AM, Александр Савинов < [email protected] > >wrote: >> >>Hello. >>I have a problem with stream API and Ignite. The value of "sum" variable >>should be 1000000 (that equals to length of file test.csv), but it equals >>999424. If file length is small (10 or even 1000) there is nothing in cache. >>Thank you. >>Если вы знаете русский, ответьте, пожалуйста, на русском. >>IgniteConfiguration igniteConfiguration = new IgniteConfiguration(); >>igniteConfiguration.setPeerClassLoadingEnabled(true); >>Ignite ignite = Ignition.start(igniteConfiguration); >>CacheConfiguration<Integer, Long> cacheConfiguration = new >>CacheConfiguration<>("cache"); >>IgniteCache<Integer, Long> cache = >>ignite.getOrCreateCache(cacheConfiguration); >>IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache"); >>streamer.receiver(StreamTransformer.from((entry, arg)->{ >> Long value = entry.getValue(); >> entry.setValue(value==null ? 1L : value + 1L); >> return entry; >>})); >>Stream<String> fileStream = Files.lines(Paths.get("test.csv")); >>fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L)); >>cache.forEach((entry)->System.out.println(entry.getKey() + ": " + >>entry.getValue())); >>int s = 0; >>Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator(); >>while(iterator.hasNext()){ >> Cache.Entry<Integer, Long> entry = iterator.next(); >> s+=entry.getValue(); >>} >>System.out.println(s); >>cache.clear(); >>ignite.close(); >> >>-- >>С уважением, Александр. >>---------------------------------------------------------------------- >> >>-- >>С уважением, Александр. > -- С уважением, Александр.
