one more point to add is in my case the query will start from executor node and not driver node.
On Tue, Feb 14, 2017 at 2:42 PM, Ranjit Sahu <[email protected]> wrote: > Hi Val, > > let me explain it again. What i am looking is to built the cache for each > spark application and destroy it when spark app completes. > Something like you guys have with IgniteRDD in embedded mode. I can't use > IgniteRDD as we are getting into a nested RDD situation with our legacy > code. > > I changed my code little bit now. Please have a look and let me know if > this looks ok. > > > 1) val workers = sparkContext.getConf.getInt("spark.executor.instances", > sparkContext.getExecutorStorageStatus.length) > > // Start ignite server node on each worker in server mode. > sparkContext.parallelize(1 to workers, workers).foreachPartition(it => > ignite()) > > def ignite(): Ignite ={ > val cfg = new IgniteConfiguration(); > val ignite:Ignite = Ignition.start(cfg) > logInfo("Starting ignite node") > ignite > } > > 2) val dataframe = > sqlContext.read.format("com.databricks.spark.avro").option("header", > "true").load(sparkConf.get("spark.data.avroFiles")) > > val EntityRDD= dataframe.map (Desrialaize and save) > > 3) Load each partition to cache > > EntityRDD.mapPaatitions(x => { > > val cfg = new IgniteConfiguration(); > Ignition.setClientMode(true); > val ignite:Ignite = Ignition.getOrStart(cfg) > val orgCacheCfg:CacheConfiguration[String, EntityVO] = new > CacheConfiguration[String, EntityVO](MyCache) > orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO]) > orgCacheCfg.setCacheMode(CacheMode.PARTITIONED) > orgCacheCfg.setIndexedTypes() > val cache:IgniteCache[String, EntityVO] = ignite.getOrCreateCache( > orgCacheCfg) > while(x.hasNext){ > val entityvo = x.next() > cache.put(entityvo.Id,entityvo) > } > x > }).count() > > 4) Use the cache for look up : > > enitityNamesRDD.map(entityName => { > > val cfg = new IgniteConfiguration(); > > Ignition.setClientMode(true); > > val ignite:Ignite = Ignition.getOrStart(cfg) > val cacheConfig:CacheConfiguration[String, EntityVO] = new > CacheConfiguration[String, EntityVO](MyCache); > cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO]) > val wcaIngiteCache:IgniteCache[String, EntityVO] = > ignite.getOrCreateCache(cacheConfig) > val queryString = "canonicalName = ?" > val companyNameQuery = new SqlQuery[String,EntityVO](" > EntityVO",queryString).setArgs(entityName) > val results = wcaIngiteCache.query(companyNameQuery).getAll() > val listIter = results.listIterator() > val compResults = ListBuffer[EntityVO]() > while(listIter.hasNext){ > val compObject = listIter.next() > if(compObject.getValue.isInstanceOf[EntityVO]) > companyResults += compObject.getValue.asInstanceOf[EntityVO] > } > compResults.toVector > > }).collect().foreach(println) > > > > > Thanks, > Ranjit > > > > > On Tue, Feb 14, 2017 at 3:08 AM, vkulichenko < > [email protected]> wrote: > >> Hi Ranjit, >> >> Not sure I understood. The main problem with executors is that they are >> controlled by Spark and they are created per application. So you can't >> share >> the data stored in embedded mode and it's not really safe to store it >> there. >> This can be useful only for some simple tests/demos, but not for real >> apps. >> Let me know if I'm missing something in your use case. >> >> -Val >> >> >> >> -- >> View this message in context: http://apache-ignite-users.705 >> 18.x6.nabble.com/Help-needed-tp10540p10607.html >> Sent from the Apache Ignite Users mailing list archive at Nabble.com. >> > >
