Thanks you! It works correctly.

>Понедельник, 23 мая 2016, 11:11 +03:00 от Vladimir Ozerov 
><[email protected]>:
>
>Hi Alexander,
>
>Please make sure that you flush data streamer before checking the "sum" value:
>fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
>streamer.flush();
>Vladimir.
>
>On Mon, May 23, 2016 at 10:35 AM, Александр Савинов  < [email protected] > 
>wrote:
>>
>>Hello.
>>I have a problem with stream API and Ignite. The value of "sum" variable 
>>should be 1000000 (that equals to length of file test.csv), but it equals 
>>999424. If file length is small (10 or even 1000) there is nothing in cache.
>>Thank you.
>>Если вы знаете русский, ответьте, пожалуйста, на русском.
>>IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
>>igniteConfiguration.setPeerClassLoadingEnabled(true);
>>Ignite ignite = Ignition.start(igniteConfiguration);
>>CacheConfiguration<Integer, Long> cacheConfiguration = new 
>>CacheConfiguration<>("cache");
>>IgniteCache<Integer, Long> cache = 
>>ignite.getOrCreateCache(cacheConfiguration);
>>IgniteDataStreamer<Integer, Long> streamer = ignite.dataStreamer("cache");
>>streamer.receiver(StreamTransformer.from((entry, arg)->{
>>    Long value = entry.getValue();
>>    entry.setValue(value==null ? 1L : value + 1L);
>>    return entry;
>>}));
>>Stream<String> fileStream = Files.lines(Paths.get("test.csv"));
>>fileStream.mapToInt(Integer::parseInt).forEach(line->streamer.addData(line,1L));
>>cache.forEach((entry)->System.out.println(entry.getKey() + ": " + 
>>entry.getValue()));
>>int s = 0;
>>Iterator<Cache.Entry<Integer, Long>> iterator = cache.iterator();
>>while(iterator.hasNext()){
>>    Cache.Entry<Integer, Long> entry = iterator.next();
>>    s+=entry.getValue();
>>}
>>System.out.println(s);
>>cache.clear();
>>ignite.close();
>>
>>-- 
>>С уважением, Александр.
>>----------------------------------------------------------------------
>>
>>-- 
>>С уважением, Александр.
>


-- 
С уважением, Александр.

Reply via email to