Hi guys,
Thanks for pointing me to the right direction; Provided links about binary
marshaller was quite useful;
Unfortunately i was not able to achieve my final goal; I get a
GridDhtPartitionsExchangeFuture when i call cache.savePairs method; Here is
a full stack:
scala> cache.savePairs(pairRdd)
16/03/07 05:03:09 ERROR GridDhtPartitionsExchangeFuture: Failed to
reinitialize local partitions (preloading will be stopped):
GridDhtPartitionExchangeId [topVer=AffinityTopologyVersion [topVer=1,
minorTopVer=1], nodeId=3bbda0cf, evt=DISCOVERY_CUSTOM_EVT]
class org.apache.ignite.IgniteCheckedException: Failed to initialize
property 'id' for key class 'class java.lang.Object' and value class
'interface org.apache.ignite.binary.BinaryObject'. Make sure that one of
these classes contains respective getter method or field.
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.buildClassProperty(GridQueryProcessor.java:1638)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.processClassMeta(GridQueryProcessor.java:1517)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.initializeCache(GridQueryProcessor.java:279)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.onCacheStart(GridQueryProcessor.java:462)
at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCache(GridCacheProcessor.java:1039)
at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart(GridCacheProcessor.java:1649)
at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCachesStart(GridCacheProcessor.java:1564)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.startCaches(GridDhtPartitionsExchangeFuture.java:961)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:524)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:1297)
at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.lang.Thread.run(Thread.java:745)
16/03/07 05:03:09 ERROR GridCachePartitionExchangeManager: Runtime error
caught during grid runnable execution: GridWorker [name=partition-exchanger,
gridName=null, finished=false, isCancelled=false, hashCode=628489503,
interrupted=false, runner=exchange-worker-#47%null%]
java.lang.NullPointerException
at
org.apache.ignite.internal.processors.cache.GridCacheProcessor.onExchangeDone(GridCacheProcessor.java:1724)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:1114)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:88)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:336)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:878)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:1297)
at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.lang.Thread.run(Thread.java:745)
[Stage 2:> (0 + 2)
/ 2]
Code example:
import scala.util.{Try}
import org.apache.ignite.configuration._
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.binary.BinaryObjectBuilder
import org.apache.ignite.cache._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import java.util.LinkedHashMap
import scala.collection.mutable.LinkedHashMap
val queryEntity = new QueryEntity()
queryEntity.setKeyType(classOf[Long].getName)
queryEntity.setValueType(classOf[BinaryObject].getName)
val fields = new java.util.LinkedHashMap[String, String]()
fields.put("id", classOf[String].getName)
fields.put("name", classOf[String].getName)
queryEntity.setFields(fields)
queryEntity.setIndexes(List(new QueryIndex("id"), new
QueryIndex("name")).asJava)
val cacheCfg = new CacheConfiguration[Long, BinaryObject]()
cacheCfg.setName("cache01")
cacheCfg.setQueryEntities(List(queryEntity).asJava)
val igniteContext = new IgniteContext[Long, BinaryObject](sc, () => new
IgniteConfiguration(), false)
val cache = igniteContext.fromCache(cacheCfg)
>>cache:
org.apache.ignite.spark.IgniteRDD[Long,org.apache.ignite.binary.BinaryObject]
= IgniteRDD[1] at RDD at IgniteAbstractRDD.scala:27
scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize
at <console>:47
scala> val pairRdd = rdd.map(x => {
| val builder = igniteContext.ignite.binary.builder("DT1")
| builder.setField("id", x.toString)
| builder.setField("name", "test-" + x.toString)
| val binObj = builder.build
| binObj
| }).zipWithIndex.map(r => (r._2, r._1))
pairRdd: org.apache.spark.rdd.RDD[(Long,
org.apache.ignite.binary.BinaryObject)] = MapPartitionsRDD[5] at map at
<console>:57
cache.savePairs(pairRdd)
Probably you have any ideas which can help me to overcome this problem;
Thank you and best regards,
Dmitry
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/index-and-query-org-apache-ignite-spark-IgniteRDD-String-org-apache-spark-sql-Row-tp3343p3382.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.