Hi! I want to notice that totalCounter var is not passed to job by reference, that why you retrieves the different values on computation, and it is not a good idea to change the var outside fun context
If you wan’t to calculate totalCount use map that reduce result With best regards Alisher Alimov [email protected] > On 11 авг. 2016 г., at 8:09, percent620 <[email protected]> wrote: > > Hello, I want to integrate ignite with spark on Embedded mode , and here is > my detailed steps for this. > > 1、added maven dependencies for pom.xml > > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-core</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-indexing</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-visor-console</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-spring</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-spark</artifactId> > <version>1.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.ignite</groupId> > <artifactId>ignite-yarn</artifactId> > <version>1.6.0</version> > </dependency> > > 2、Write spark code and submit these code to yarn application, and the code > is > val igniteContext = new IgniteContext[String, BaseLine](sc,() => new > IgniteConfiguration(),false) > > false: is embedded mode for ignite, right? > tempDStream.foreachRDD(rdd =>{ > val rddCounter = rdd.count().toInt > totallyCounter += rddCounter > println(DateUtil.getNowDate() + " invoked! count=" + rddCounter + > ";totallyCounter= => " + totallyCounter) > if(!rdd.isEmpty()) { > //==================================start====== > val cacheRdd = igniteContext.fromCache("partitioned") > > //fianlLatestBaselineRDD: total rdd that need to cache on ignite > cacheRdd.savePairs(fianlLatestBaselineRDD) > > println("xx=> " + xxx() +"\t " + > " xxx => " + xxx() + "\t " + > " cacheRdd.counter=> " + cacheRdd.count()) > } > > > > } > 3、the code is running on yarn successfully and I found that the ignite can't > cache all the datas,and it discard some datas, such as > 2016-08-11 11:28:00:es write invoked! count=10;totally totallyCounter= => 10 > cacheRdd.start.counter=> 0 > 2、cacheRdd.start.fianlLatestBaselineRDD.counter=> 10 > xxx.count=> 140 yyy.count => 10 cacheRdd.counter=> 5(need to be 10) > ======================================================================================================================= > 2016-08-11 11:28:48:write invoked! count= 9;totallyCounter= => 19 > cacheRdd.start.counter=> 5 > xxxx.count=> 126 yyy.count => 9 cacheRdd.counter=> 9(need to be 19) > ========================================================================================================================= > > > Can anyone help me on this issues? thanks again!!!! > > > > > > -- > View this message in context: > http://apache-ignite-users.70518.x6.nabble.com/Embedded-mode-ignite-on-spark-tp6942.html > Sent from the Apache Ignite Users mailing list archive at Nabble.com.
signature.asc
Description: Message signed with OpenPGP using GPGMail
