Dmitry, it is't Ignite issue but Spark limitation. RDD class defines SparkContext as transient field and can't be serialized or deserialized. As I understand Spark RDD can't be shared between different Spark contexts.
See also this post on StackOverflow ( http://stackoverflow.com/questions/29567247/serializing-rdd) for more information. May be better strategy is storing RDD that transformed to collection or using Spark persistence for storing intermediate RDDs. On Wed, Feb 10, 2016 at 11:18 AM, Dmitriy Morozov <[email protected]> wrote: > Thanks Val! > > I was able to configure Ignite's cache with Spring config. > > > Are there any other methods that should be optimized? > > It's hard to tell right now as I'm still trying to figure out how to cache > Spark's DataFrame in Ignite. It seems like right now it's only possible to > cache IgniteRDD. > I was trying to do it as below > > final SparkConf sparkConf = new SparkConf() > .setAppName("shared-rdd-example") > .setMaster("local"); > > final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); > final SQLContext sqlContext = new SQLContext(sparkContext); > final DataFrame df = > sqlContext.load("/Users/dmi3y/temp/outpatient-2000.parquet"); > > final IgniteContext igniteContext = new IgniteContext(sparkContext.sc(), > "ignite/example-cache.xml", true); > > try (final Ignite ignite = igniteContext.ignite(); > final IgniteCache<String, RDD<Row>> cache = > ignite.getOrCreateCache("outpatient")) { > > final RDD<Row> rdd = df.rdd(); > cache.put("rdd1", rdd); > > final RDD<Row> rdd1 = cache.get("rdd1"); > final DataFrame dataFrameFromCache = > sqlContext.createDataFrame(rdd1.toJavaRDD(), df.schema()); > System.out.println(dataFrameFromCache.count()); > } finally { > igniteContext.close(); > } > > But this gives me a following exception: > > Exception in thread "main" org.apache.spark.SparkException: RDD > transformations and actions can only be invoked by the driver, not inside of > other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is > invalid because the values transformation and count action cannot be > performed inside of the rdd1.map transformation. For more information, see > SPARK-5063. > at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) > at org.apache.spark.rdd.RDD.map(RDD.scala:317) > at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:445) > at org.apache.spark.sql.S > > I assume this is caused by Spark driver not knowing anything about the RDD > that was retrieved from Cache? > Is there a suggested strategy to cache DataFrame with Ignite? > > Thanks! > Dima > > On 2 February 2016 at 22:03, vkulichenko <[email protected]> > wrote: > >> Hi Dmitry, >> >> Ignite provides better data distribution and better performance if there >> are >> more partitions than nodes in topology. 1024 is the default number of >> partitions, but you can change it by providing custom affinity function >> configuration: >> >> CacheConfiguration cfg = new CacheConfiguration("hello-world-cache"). >> setAffinity(new RendezvousAffinityFunction(false, 32)); // 32 >> partitions >> instead of 1024. >> final IgniteRDD igniteRDD = igniteContext.fromCache(cfg); >> >> You can try this and see if it gets better. >> >> Actually, I think that methods like isEmpty should be overridden in >> IgniteRDD to use native IgniteCache API, it will be much faster. I >> created a >> ticket for this task [1], feel free to provide your comments there. Are >> there any other methods that should be optimized? >> >> [1] https://issues.apache.org/jira/browse/IGNITE-2538 >> >> -Val >> >> >> >> -- >> View this message in context: >> http://apache-ignite-users.70518.x6.nabble.com/Sharing-Spark-RDDs-with-Ignite-tp2805p2808.html >> Sent from the Apache Ignite Users mailing list archive at Nabble.com. >> > > > > -- > Kind regards, > Dima > -- Andrey Gura GridGain Systems, Inc. www.gridgain.com
