Hi Guys,

I am coding a simple application with Ignite and Spark, UT works well, but when 
make assembly fat jar and run with Spark, I got this error.

It looks IgniteConfiguration can't serializable, but in IgniteContext, Once 
variable, this type value marked as @transient, it should not need to serialize.

Version is 1.7.0
Exception:
Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
        at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:226)
        at com.tr.IgniteSparkApps$.main(IgniteSparkApps.scala:38)
        at com.tr.IgniteSparkApps.main(IgniteSparkApps.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: 
org.apache.ignite.configuration.IgniteConfiguration
Serialization stack:
        - object not serializable (class: 
org.apache.ignite.configuration.IgniteConfiguration, value: IgniteConfiguration 
[gridName=null, pubPoolSize=64, callbackPoolSize=64, sysPoolSize=64, 
mgmtPoolSize=4, igfsPoolSize=32, utilityCachePoolSize=64, 
utilityCacheKeepAliveTime=10000, marshCachePoolSize=64, 
marshCacheKeepAliveTime=10000, p2pPoolSize=2, igniteHome=null, 
igniteWorkDir=null, mbeanSrv=null, nodeId=null, marsh=null, marshLocJobs=false, 
daemon=false, p2pEnabled=false, netTimeout=5000, sndRetryDelay=1000, 
sndRetryCnt=3, clockSyncSamples=8, clockSyncFreq=120000, metricsHistSize=10000, 
metricsUpdateFreq=2000, metricsExpTime=9223372036854775807, 
discoSpi=TcpDiscoverySpi [addrRslvr=null, sockTimeout=5000, ackTimeout=5000, 
reconCnt=10, maxAckTimeout=600000, forceSrvMode=false, 
clientReconnectDisabled=false], segPlc=STOP, segResolveAttempts=2, 
waitForSegOnStart=true, allResolversPassReq=true, segChkFreq=10000, 
commSpi=null, evtSpi=null, colSpi=null, deploySpi=null, swapSpaceSpi=null, 
indexingSpi=null, addrRslvr=null, clientMode=true, rebalanceThreadPoolSize=1, 
txCfg=org.apache.ignite.configuration.TransactionConfiguration@25b74370, 
cacheSanityCheckEnabled=true, discoStartupDelay=60000, deployMode=SHARED, 
p2pMissedCacheSize=100, locHost=null, timeSrvPortBase=31100, 
timeSrvPortRange=100, failureDetectionTimeout=10000, metricsLogFreq=60000, 
hadoopCfg=null, 
connectorCfg=org.apache.ignite.configuration.ConnectorConfiguration@15e8c040, 
odbcCfg=null, warmupClos=null, atomicCfg=AtomicConfiguration 
[seqReserveSize=1000, cacheMode=PARTITIONED, backups=0], classLdr=null, 
sslCtxFactory=null, platformCfg=null, binaryCfg=null, lateAffAssignment=true])
        - field (class: com.tr.IgniteSparkApps$$anonfun$1, name: igniteConf$1, 
type: class org.apache.ignite.configuration.IgniteConfiguration)
        - object (class com.tr.IgniteSparkApps$$anonfun$1, <function0>)
        - field (class: org.apache.ignite.spark.Once, name: clo, type: 
interface scala.Function0)
        - object (class org.apache.ignite.spark.Once, 
org.apache.ignite.spark.Once@7eae55)
        - field (class: org.apache.ignite.spark.IgniteContext, name: cfgClo, 
type: class org.apache.ignite.spark.Once)
        - object (class org.apache.ignite.spark.IgniteContext, 
org.apache.ignite.spark.IgniteContext@6f2d3391)
        - field (class: org.apache.ignite.spark.impl.IgniteAbstractRDD, name: 
ic, type: class org.apache.ignite.spark.IgniteContext)
        - object (class org.apache.ignite.spark.IgniteRDD, IgniteRDD[0] at RDD 
at IgniteAbstractRDD.scala:32)
        - field (class: org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1, 
name: $outer, type: class org.apache.ignite.spark.IgniteRDD)
        - object (class org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1, 
<function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

here is my code:

val sc = new SparkContext(new SparkConf().setAppName("IgniteSparkApp"))

val igniteConf = new IgniteConfiguration
val disco = new TcpDiscoverySpi()
val ipFinder = new TcpDiscoveryMulticastIpFinder
ipFinder.setMulticastGroup("228.10.10.157")
disco.setIpFinder(ipFinder)
igniteConf.setDiscoverySpi(disco)
igniteConf.setClientMode(true)

val igniteContext = new IgniteContext(sc, () => igniteConf)

for (cache <- igniteContext.ignite().cacheNames().asScala) {
    igniteContext.ignite().destroyCache(cache)
    logInfo(s"SUCCESS Destroy:${cache}")
}

val sharedRdd = igniteContext.fromCache[String, Row]("persist_Fundamental")
if (sharedRdd.isEmpty()) {
    import com.tr.hbase.HBaseContext
    val sc: HBaseContext = igniteContext
    val rdd = sc.hbase("Fundamental")
    logInfo(s"start saving RDD:${rdd.getClass.getName}")
    sharedRdd.savePairs(rdd, true)
}

Thanks.
Ming

Reply via email to