Hi guys,

Thanks for pointing me to the right direction; Provided links about binary
marshaller was quite useful;
Unfortunately i was not able to achieve my final goal; I get a
GridDhtPartitionsExchangeFuture when i call cache.savePairs method; Here is
a full stack:

scala> cache.savePairs(pairRdd)
16/03/07 05:03:09 ERROR GridDhtPartitionsExchangeFuture: Failed to
reinitialize local partitions (preloading will be stopped):
GridDhtPartitionExchangeId [topVer=AffinityTopologyVersion [topVer=1,
minorTopVer=1], nodeId=3bbda0cf, evt=DISCOVERY_CUSTOM_EVT]
class org.apache.ignite.IgniteCheckedException: Failed to initialize
property 'id' for key class 'class java.lang.Object' and value class
'interface org.apache.ignite.binary.BinaryObject'. Make sure that one of
these classes contains respective getter method or field.
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.buildClassProperty(GridQueryProcessor.java:1638)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.processClassMeta(GridQueryProcessor.java:1517)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.initializeCache(GridQueryProcessor.java:279)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.onCacheStart(GridQueryProcessor.java:462)
        at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCache(GridCacheProcessor.java:1039)
        at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart(GridCacheProcessor.java:1649)
        at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCachesStart(GridCacheProcessor.java:1564)
        at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.startCaches(GridDhtPartitionsExchangeFuture.java:961)
        at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:524)
        at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:1297)
        at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
        at java.lang.Thread.run(Thread.java:745)
16/03/07 05:03:09 ERROR GridCachePartitionExchangeManager: Runtime error
caught during grid runnable execution: GridWorker [name=partition-exchanger,
gridName=null, finished=false, isCancelled=false, hashCode=628489503,
interrupted=false, runner=exchange-worker-#47%null%]
java.lang.NullPointerException
        at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.onExchangeDone(GridCacheProcessor.java:1724)
        at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:1114)
        at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:88)
        at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:336)
        at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:878)
        at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:1297)
        at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
        at java.lang.Thread.run(Thread.java:745)
[Stage 2:>                                                          (0 + 2)
/ 2]


Code example:

import scala.util.{Try}
import org.apache.ignite.configuration._
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.binary.BinaryObjectBuilder
import org.apache.ignite.cache._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import java.util.LinkedHashMap
import scala.collection.mutable.LinkedHashMap


val queryEntity = new QueryEntity()
queryEntity.setKeyType(classOf[Long].getName)
queryEntity.setValueType(classOf[BinaryObject].getName)

val fields = new java.util.LinkedHashMap[String, String]()
fields.put("id", classOf[String].getName)
fields.put("name", classOf[String].getName)

queryEntity.setFields(fields)
queryEntity.setIndexes(List(new QueryIndex("id"), new
QueryIndex("name")).asJava)


val cacheCfg = new CacheConfiguration[Long, BinaryObject]()
cacheCfg.setName("cache01")
cacheCfg.setQueryEntities(List(queryEntity).asJava)

val igniteContext = new IgniteContext[Long, BinaryObject](sc, () => new
IgniteConfiguration(), false)

val cache = igniteContext.fromCache(cacheCfg)
>>cache:
org.apache.ignite.spark.IgniteRDD[Long,org.apache.ignite.binary.BinaryObject]
= IgniteRDD[1] at RDD at IgniteAbstractRDD.scala:27


scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize
at <console>:47

scala> val pairRdd = rdd.map(x => {
     |     val builder = igniteContext.ignite.binary.builder("DT1")
     |     builder.setField("id", x.toString)
     |     builder.setField("name", "test-" + x.toString)
     |     val binObj = builder.build
     |     binObj
     | }).zipWithIndex.map(r => (r._2, r._1))
pairRdd: org.apache.spark.rdd.RDD[(Long,
org.apache.ignite.binary.BinaryObject)] = MapPartitionsRDD[5] at map at
<console>:57

cache.savePairs(pairRdd)


Probably you have any ideas which can help me to overcome this problem;

Thank you and best regards,
Dmitry








--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/index-and-query-org-apache-ignite-spark-IgniteRDD-String-org-apache-spark-sql-Row-tp3343p3382.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to