Hi!

I want to notice that totalCounter var is not passed to job by reference, that 
why you retrieves the different values on computation, and it is not a good 
idea to change the var outside fun context

If you wan’t to calculate totalCount use map that reduce result

With best regards
Alisher Alimov
[email protected]




> On 11 авг. 2016 г., at 8:09, percent620 <[email protected]> wrote:
> 
> Hello, I want to integrate ignite with spark on Embedded mode , and here is
> my detailed steps for this.
> 
> 1、added maven dependencies for pom.xml
> 
>        <dependency>
>            <groupId>org.apache.ignite</groupId>
>            <artifactId>ignite-core</artifactId>
>            <version>1.6.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.ignite</groupId>
>            <artifactId>ignite-indexing</artifactId>
>            <version>1.6.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.ignite</groupId>
>            <artifactId>ignite-visor-console</artifactId>
>            <version>1.6.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.ignite</groupId>
>            <artifactId>ignite-spring</artifactId>
>            <version>1.6.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.ignite</groupId>
>            <artifactId>ignite-spark</artifactId>
>            <version>1.6.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.ignite</groupId>
>            <artifactId>ignite-yarn</artifactId>
>            <version>1.6.0</version>
>        </dependency>
> 
> 2、Write spark code and submit these code to yarn application, and the code
> is
> val igniteContext = new IgniteContext[String, BaseLine](sc,() => new
> IgniteConfiguration(),false)
> 
> false: is embedded mode for ignite, right?
> tempDStream.foreachRDD(rdd =>{
>      val rddCounter = rdd.count().toInt
> totallyCounter += rddCounter
> println(DateUtil.getNowDate() + " invoked! count=" + rddCounter +
> ";totallyCounter= => " + totallyCounter)
> if(!rdd.isEmpty()) {
>        //==================================start======
>        val cacheRdd = igniteContext.fromCache("partitioned")
> 
>       //fianlLatestBaselineRDD: total rdd that need to cache on ignite
>       cacheRdd.savePairs(fianlLatestBaselineRDD)
> 
> println("xx=> " + xxx() +"\t " +
>          " xxx => " + xxx() + "\t " +
>          " cacheRdd.counter=> " + cacheRdd.count())
> }
> 
> 
> 
> }
> 3、the code is running on yarn successfully and I found that the ignite can't
> cache all the datas,and it discard some datas, such as
> 2016-08-11 11:28:00:es write invoked! count=10;totally totallyCounter= => 10
> cacheRdd.start.counter=> 0
> 2、cacheRdd.start.fianlLatestBaselineRDD.counter=> 10
> xxx.count=> 140       yyy.count => 10   cacheRdd.counter=> 5(need to be 10)
> =======================================================================================================================
> 2016-08-11 11:28:48:write invoked! count= 9;totallyCounter= => 19
> cacheRdd.start.counter=> 5
> xxxx.count=> 126       yyy.count => 9   cacheRdd.counter=> 9(need to be 19)
> =========================================================================================================================
> 
> 
> Can anyone help me on this issues? thanks again!!!!
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-ignite-users.70518.x6.nabble.com/Embedded-mode-ignite-on-spark-tp6942.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to