Hi Guys,
I am coding a simple application with Ignite and Spark, UT works well, but when
make assembly fat jar and run with Spark, I got this error.
It looks IgniteConfiguration can't serializable, but in IgniteContext, Once
variable, this type value marked as @transient, it should not need to serialize.
Version is 1.7.0
Exception:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:226)
at com.tr.IgniteSparkApps$.main(IgniteSparkApps.scala:38)
at com.tr.IgniteSparkApps.main(IgniteSparkApps.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.ignite.configuration.IgniteConfiguration
Serialization stack:
- object not serializable (class:
org.apache.ignite.configuration.IgniteConfiguration, value: IgniteConfiguration
[gridName=null, pubPoolSize=64, callbackPoolSize=64, sysPoolSize=64,
mgmtPoolSize=4, igfsPoolSize=32, utilityCachePoolSize=64,
utilityCacheKeepAliveTime=10000, marshCachePoolSize=64,
marshCacheKeepAliveTime=10000, p2pPoolSize=2, igniteHome=null,
igniteWorkDir=null, mbeanSrv=null, nodeId=null, marsh=null, marshLocJobs=false,
daemon=false, p2pEnabled=false, netTimeout=5000, sndRetryDelay=1000,
sndRetryCnt=3, clockSyncSamples=8, clockSyncFreq=120000, metricsHistSize=10000,
metricsUpdateFreq=2000, metricsExpTime=9223372036854775807,
discoSpi=TcpDiscoverySpi [addrRslvr=null, sockTimeout=5000, ackTimeout=5000,
reconCnt=10, maxAckTimeout=600000, forceSrvMode=false,
clientReconnectDisabled=false], segPlc=STOP, segResolveAttempts=2,
waitForSegOnStart=true, allResolversPassReq=true, segChkFreq=10000,
commSpi=null, evtSpi=null, colSpi=null, deploySpi=null, swapSpaceSpi=null,
indexingSpi=null, addrRslvr=null, clientMode=true, rebalanceThreadPoolSize=1,
txCfg=org.apache.ignite.configuration.TransactionConfiguration@25b74370,
cacheSanityCheckEnabled=true, discoStartupDelay=60000, deployMode=SHARED,
p2pMissedCacheSize=100, locHost=null, timeSrvPortBase=31100,
timeSrvPortRange=100, failureDetectionTimeout=10000, metricsLogFreq=60000,
hadoopCfg=null,
connectorCfg=org.apache.ignite.configuration.ConnectorConfiguration@15e8c040,
odbcCfg=null, warmupClos=null, atomicCfg=AtomicConfiguration
[seqReserveSize=1000, cacheMode=PARTITIONED, backups=0], classLdr=null,
sslCtxFactory=null, platformCfg=null, binaryCfg=null, lateAffAssignment=true])
- field (class: com.tr.IgniteSparkApps$$anonfun$1, name: igniteConf$1,
type: class org.apache.ignite.configuration.IgniteConfiguration)
- object (class com.tr.IgniteSparkApps$$anonfun$1, <function0>)
- field (class: org.apache.ignite.spark.Once, name: clo, type:
interface scala.Function0)
- object (class org.apache.ignite.spark.Once,
org.apache.ignite.spark.Once@7eae55)
- field (class: org.apache.ignite.spark.IgniteContext, name: cfgClo,
type: class org.apache.ignite.spark.Once)
- object (class org.apache.ignite.spark.IgniteContext,
org.apache.ignite.spark.IgniteContext@6f2d3391)
- field (class: org.apache.ignite.spark.impl.IgniteAbstractRDD, name:
ic, type: class org.apache.ignite.spark.IgniteContext)
- object (class org.apache.ignite.spark.IgniteRDD, IgniteRDD[0] at RDD
at IgniteAbstractRDD.scala:32)
- field (class: org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1,
name: $outer, type: class org.apache.ignite.spark.IgniteRDD)
- object (class org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
here is my code:
val sc = new SparkContext(new SparkConf().setAppName("IgniteSparkApp"))
val igniteConf = new IgniteConfiguration
val disco = new TcpDiscoverySpi()
val ipFinder = new TcpDiscoveryMulticastIpFinder
ipFinder.setMulticastGroup("228.10.10.157")
disco.setIpFinder(ipFinder)
igniteConf.setDiscoverySpi(disco)
igniteConf.setClientMode(true)
val igniteContext = new IgniteContext(sc, () => igniteConf)
for (cache <- igniteContext.ignite().cacheNames().asScala) {
igniteContext.ignite().destroyCache(cache)
logInfo(s"SUCCESS Destroy:${cache}")
}
val sharedRdd = igniteContext.fromCache[String, Row]("persist_Fundamental")
if (sharedRdd.isEmpty()) {
import com.tr.hbase.HBaseContext
val sc: HBaseContext = igniteContext
val rdd = sc.hbase("Fundamental")
logInfo(s"start saving RDD:${rdd.getClass.getName}")
sharedRdd.savePairs(rdd, true)
}
Thanks.
Ming