Serialization issue when using Spark3.1.2 with hadoop yarn
My spark Job fails with this error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (davben-lubuntu executor 2): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD My OS Linux Ubuntu 20 is in this way organized: I have two user: /home/davben and /home/hadoop. Into hadoop user I have installed hadoop 3.1 and spark-3.1.2-hadoop3.2. Both users refers to java-8-openjdk Java installation. The Spark job is launched from user davben on eclipse IDE in this way: I create the spark conf and the spark session System.setProperty("hadoop.home.dir", "/home/hadoop/hadoop"); SparkConf sparkConf = new SparkConf() .setAppName("simple") .setMaster("yarn") .set("spark.executor.memory", "1g") .set("deploy.mode", "cluster") .set("spark.yarn.stagingDir", "hdfs://localhost:9000/user/hadoop/") .set("spark.hadoop.fs.defaultFS","hdfs://localhost:9000") .set("spark.hadoop.yarn.resourcemanager.hostname","localhost") .set("spark.hadoop.yarn.resourcemanager.scheduler.address","localhost:8030") .set("spark.hadoop.yarn.resourcemanager.address ","localhost:8032") .set("spark.hadoop.yarn.resourcemanager.webapp.address","localhost:8088") .set("spark.hadoop.yarn.resourcemanager.admin.address","localhost:8083") SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); My spark Job fails with this error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (davben-lubuntu executor 2): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD My OS Linux Ubuntu 20 is in this way organized: I have two user: /home/davben and /home/hadoop. Into hadoop user I have installed hadoop 3.1 and spark-3.1.2-hadoop3.2. Both users refers to java-8-openjdk Java installation. The Spark job is launched from user davben on eclipse IDE in this way: I create the spark conf and the spark session System.setProperty("hadoop.home.dir", "/home/hadoop/hadoop"); SparkConf sparkConf = new SparkConf() .setAppName("simple") .setMaster("yarn") .set("spark.executor.memory", "1g") .set("deploy.mode", "cluster") .set("spark.yarn.stagingDir", "hdfs://localhost:9000/user/hadoop/") .set("spark.hadoop.fs.defaultFS","hdfs://localhost:9000") .set("spark.hadoop.yarn.resourcemanager.hostname","localhost") .set("spark.hadoop.yarn.resourcemanager.scheduler.address","localhost:8030") .set("spark.hadoop.yarn.resourcemanager.address ","localhost:8032") .set("spark.hadoop.yarn.resourcemanager.webapp.address","localhost:8088") .set("spark.hadoop.yarn.resourcemanager.admin.address","localhost:8083") SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); Then I create a dataset with two entries: List rows = new ArrayList<>(); rows.add(RowFactory.create("a", "b")); rows.add(RowFactory.create("a", "a")); StructType structType = new StructType(); structType = structType.add("edge_1", DataTypes.StringType, false); structType = structType.add("edge_2", DataTypes.StringType, false); ExpressionEncoder edgeEncoder = RowEncoder.apply(structType); Dataset edge = spark.createDataset(rows, edgeEncoder); Then I print the content of the current dataset edge edge.show(); Then I perform a map transformation on edge that upper cases the values of the two entries and return the result in edge2 Dataset edge2 = edge.map(new MyFunction2(), edgeEncoder); The following is the code of MyFunction2 public class MyFunction2 implements MapFunction, scala.Serializable { private static final long serialVersionUID = 1L; @Override public Row call(Row v1) throws Exception { String el1 = v1.get(0).toString().toUpperCase(); String el2 = v1.get(1).toString().toUpperCase(); return RowFactory.create(el1,el2); } } Finally I show the content of edge2 edge2.show(); I can confirm that, checking on the hadoop UI a localhost:8088, the job is submitted correctly, and what sounds strange is that the first show is returned correctly in my console, but the second one fails returning the up mentioned error. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
BiMap BroadCast Variable - Kryo Serialization Issue
Hi, I am getting Nullpointer exception due to Kryo Serialization issue, while trying to read a BiMap broadcast variable. Attached is the code snippets. Pointers shared here didn't help - link1 <http://stackoverflow.com/questions/33156095/spark-serialization-issue-with-hashmap>, link2 <http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist>. Spark version used is 1.6.x, but this was working with 1.3.x version. Any help in this regard is much appreciated. Exception: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException App > Serialization trace: App > value (com.demo.BiMapWrapper) App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1238) App > at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) App > at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) App > at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) App > at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) App > at com.manthan.aaas.algo.associationmining.impl.Test.lambda$execute$6abf5fd0$1(Test.java:39) App > at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) App > at scala.collection.Iterator$class.foreach(Iterator.scala:727) App > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) App > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) App > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) App > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) App > at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) App > at scala.collection.AbstractIterator.to(Iterator.scala:1157) App > at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) App > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) App > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) App > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) App > at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) App > at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) App > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978) App > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978) App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) App > at org.apache.spark.scheduler.Task.run(Task.scala:89) App > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) App > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) App > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) App > at java.lang.Thread.run(Thread.java:745) App > Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException App > Serialization trace: App > value (com.demo.BiMapWrapper) App > at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) App > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) App > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) App > at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) App > at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217) App > at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231) App > ... 29 more App > Caused by: java.lang.NullPointerException App > at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180) App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:230) App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:218) App > at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) App > at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) App > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) App > at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) App > ... 35 more App > App > 16/11/02 18:39:01 dispatcher-event-loop-2 INFO TaskSetManager: Starting task 17.1 in stage 1.0 (TID 19, ip-10-0-1-237.ec2.internal, partition 17,PROCESS_LOCAL, 2076 bytes) App > 16/11/02 18:39:01 task-result-getter-3 INFO TaskSetManager: Lost task 17.1 in stage
Spark, Kryo Serialization Issue with ProtoBuf field
Hi, I am seeing an error when running my spark job relating to Serialization of a protobuf field when transforming an RDD. com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks) The error seems to be created at this point: val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map { tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) && !isBookPublished(o)).mapPartitions( it => it.map{ord => (ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))})) } val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) => opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue() } val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) => opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue() } The field is a list specified in the protobuf as the below: otherAuthors_ = java.util.Collections.emptyList() As you can see the code is not actually utilising that field from the Book Protobuf, although it still is being transmitted over the network. Has anyone got any advice on this? Thanks, K
Re: Serialization issue with Spark
You have not mentioned what task is not serializable. The stack trace is usually a good idea while asking this question. Usually spark will tell you what class it is not able to serialize. If it is one of your own class then try making it serializable or make it transient so that it only gets created on the executor. ...Manas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue with Spark
Hello Hafsa, TaskNotSerialized exception usually means that you are trying to use an object, defined in the driver, in code that runs on workers. Can you post the code that ir generating this error here, so we can better advise you? Cheers. 2016-03-23 14:14 GMT-03:00 Hafsa Asif <hafsa.a...@matchinguu.com>: > Can anyone please help me in this issue? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26579.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Serialization issue with Spark
Can anyone please help me in this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue with Spark
Can you show code snippet and the exception for 'Task is not serializable' ? Please see related JIRA: SPARK-10251 whose pull request contains code for registering classes with Kryo. Cheers On Tue, Mar 22, 2016 at 7:00 AM, Hafsa Asif <hafsa.a...@matchinguu.com> wrote: > Hello, > I am facing Spark serialization issue in Spark (1.4.1 - Java Client) with > Spring Framework. It is known that Spark needs serialization and it > requires > every class need to be implemented with java.io.Serializable. But, in the > documentation link: http://spark.apache.org/docs/latest/tuning.html, it is > mentioned that it is not a good approach and better to use Kryo. > I am using Kryo in Spark configuration like this: > public @Bean DeepSparkContext sparkContext(){ > DeepSparkConfig conf = new DeepSparkConfig(); > conf.setAppName(this.environment.getProperty("APP_NAME")) > .setMaster(master) > .set("spark.executor.memory", > this.environment.getProperty("SPARK_EXECUTOR_MEMORY")) > .set("spark.cores.max", > this.environment.getProperty("SPARK_CORES_MAX")) > .set("spark.default.parallelism", > this.environment.getProperty("SPARK_DEFAULT_PARALLELISM")); > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > return new DeepSparkContext(conf); > } > > but still getting exception in Spark that 'Task is not serializable'. I > also > donot want to make spark contect 'static'. > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Serialization issue with Spark
Hello, I am facing Spark serialization issue in Spark (1.4.1 - Java Client) with Spring Framework. It is known that Spark needs serialization and it requires every class need to be implemented with java.io.Serializable. But, in the documentation link: http://spark.apache.org/docs/latest/tuning.html, it is mentioned that it is not a good approach and better to use Kryo. I am using Kryo in Spark configuration like this: public @Bean DeepSparkContext sparkContext(){ DeepSparkConfig conf = new DeepSparkConfig(); conf.setAppName(this.environment.getProperty("APP_NAME")) .setMaster(master) .set("spark.executor.memory", this.environment.getProperty("SPARK_EXECUTOR_MEMORY")) .set("spark.cores.max", this.environment.getProperty("SPARK_CORES_MAX")) .set("spark.default.parallelism", this.environment.getProperty("SPARK_DEFAULT_PARALLELISM")); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); return new DeepSparkContext(conf); } but still getting exception in Spark that 'Task is not serializable'. I also donot want to make spark contect 'static'. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.
Hi, I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm trying to save my data frame to parquet. The issue I'm stuck looks like serialization tries to do pretty weird thing: tries to write to an empty array. The last (through stack trace) line of spark code that leads to exception is in method SerializationDebugger.visitSerializable(o: Object, stack: List[String]): List[String]. desc.getObjFieldValues(finalObj, objFieldValues) The reason it does so, is because finalObj is org.apache.spark.sql.execution.Project and objFieldValues is an empty array! As a result there are two fields to read from the Project instance object (happens in java.io.ObjectStreamClass), but there is an empty array to read into. A little bit of code with debug info: private def visitSerializable(o: Object, stack: List[String]): List[String] = { val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project, desc: org.apache.spark.sql.execution.Project val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0: SparkPlan, 1: Project] var i = 0 //i: 0 while (i slotDescs.length) { val slotDesc = slotDescs(i) //slotDesc: org.apache.spark.sql.execution.SparkPlan if (slotDesc.hasWriteObjectMethod) { // TODO: Handle classes that specify writeObject method. } else { val fields: Array[ObjectStreamField] = slotDesc.getFields //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled] val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields) //objFieldValues: java.lang.Object[0] val numPrims = fields.length - objFieldValues.length //numPrims: 1 desc.getObjFieldValues(finalObj, objFieldValues) //leads to exception So it looks like it gets objFieldValues array from the SparkPlan object, but uses it to receive values from Project object. Here is the schema of my data frame root |-- Id: long (nullable = true) |-- explodes: struct (nullable = true) ||-- Identifiers: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- Type: array (nullable = true) |||||-- element: string (containsNull = true) |-- Identifiers: struct (nullable = true) ||-- Type: array (nullable = true) |||-- element: string (containsNull = true) |-- Type2: string (nullable = true) |-- Type: string (nullable = true) Actual stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.
You've probably hit this bug: https://issues.apache.org/jira/browse/SPARK-7180 It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to false and see if it goes away. On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov evgeny.a.moro...@gmail.com wrote: Hi, I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm trying to save my data frame to parquet. The issue I'm stuck looks like serialization tries to do pretty weird thing: tries to write to an empty array. The last (through stack trace) line of spark code that leads to exception is in method SerializationDebugger.visitSerializable(o: Object, stack: List[String]): List[String]. desc.getObjFieldValues(finalObj, objFieldValues) The reason it does so, is because finalObj is org.apache.spark.sql.execution.Project and objFieldValues is an empty array! As a result there are two fields to read from the Project instance object (happens in java.io.ObjectStreamClass), but there is an empty array to read into. A little bit of code with debug info: private def visitSerializable(o: Object, stack: List[String]): List[String] = { val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project, desc: org.apache.spark.sql.execution.Project val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0: SparkPlan, 1: Project] var i = 0 //i: 0 while (i slotDescs.length) { val slotDesc = slotDescs(i) //slotDesc: org.apache.spark.sql.execution.SparkPlan if (slotDesc.hasWriteObjectMethod) { // TODO: Handle classes that specify writeObject method. } else { val fields: Array[ObjectStreamField] = slotDesc.getFields //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled] val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields) //objFieldValues: java.lang.Object[0] val numPrims = fields.length - objFieldValues.length //numPrims: 1 desc.getObjFieldValues(finalObj, objFieldValues) //leads to exception So it looks like it gets objFieldValues array from the SparkPlan object, but uses it to receive values from Project object. Here is the schema of my data frame root |-- Id: long (nullable = true) |-- explodes: struct (nullable = true) ||-- Identifiers: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- Type: array (nullable = true) |||||-- element: string (containsNull = true) |-- Identifiers: struct (nullable = true) ||-- Type: array (nullable = true) |||-- element: string (containsNull = true) |-- Type2: string (nullable = true) |-- Type: string (nullable = true) Actual stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) Caused by:
serialization issue
Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone*
Re: serialization issue
While submitting the job, you can use --jars, --driver-classpath etc configurations to add the jar. Apart from that if you are running the job as a standalone application, then you can use the sc.addJar option to add the jar (which will ship this jar into all the executors) Regards, Anish On 8/13/15, 周千昊 qhz...@apache.org wrote: Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: serialization issue with mapPartitions
You cannot pass your jobConf object inside any of the transformation function in spark (like map, mapPartitions, etc.) since org.apache.hadoop.mapreduce.Job is not Serializable. You can use KryoSerializer (See this doc http://spark.apache.org/docs/latest/tuning.html#data-serialization), We usually converts the JobConf into ByteArray and pass over the byteArray object inside the map and from there we creates the jobConf (new variable) with the data inside byteArray object. I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20865.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
serialization issue with mapPartitions
Hi, I got some issues with mapPartitions with the following piece of code: val sessions = sc .newAPIHadoopFile( ... path to an avro file ..., classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]], classOf[AvroKey[ByteBuffer]], classOf[NullWritable], job.getConfiguration()) .mapPartitions { valueIterator = val config = job.getConfiguration() . . . } .collect() Why job.getConfiguration() in the function mapPartitions will generate the following message? Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job If I take out 'val config = job.getConfiguration()' in the mapPartitions, the code works fine, even through job.getConfiguration() shows up also in newAPIHadoopFile(). Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: serialization issue with mapPartitions
Hi, On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow eyc...@hotmail.com wrote: I got some issues with mapPartitions with the following piece of code: val sessions = sc .newAPIHadoopFile( ... path to an avro file ..., classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]], classOf[AvroKey[ByteBuffer]], classOf[NullWritable], job.getConfiguration()) .mapPartitions { valueIterator = val config = job.getConfiguration() . } .collect() Why job.getConfiguration() in the function mapPartitions will generate the following message? Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job The functions inside mapPartitions() will be executed on the Spark executors, not the Spark driver. Therefore, the function body needs to be serialized and sent to the executors via network. If that is not possible (in your case, `job` cannot be serialized), you will get a NotSerializableException. It works inside newAPIHadoopFile because this is executed on the driver. Tobias
Re: serialization issue with mapPartitions
I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20861.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: serialization issue with mapPartitions
Hi, On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.com wrote: I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable. Tobias
RE: serialization issue with mapPartitions
Hi, Hadoop Configuration is only Writable, not Java Serializable. You can use SerializableWritable (in Spark) to wrap the Configuration to make it serializable, and use broadcast variable to broadcast this conf to all the node, then you can use it in mapPartitions, rather than serialize it within closure. You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage scenario like yours. Thanks Jerry. From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Friday, December 26, 2014 9:38 AM To: ey-chih chow Cc: user Subject: Re: serialization issue with mapPartitions Hi, On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.commailto:eyc...@hotmail.com wrote: I should rephrase my question as follows: How to use the corresponding Hadoop Configuration of a HadoopRDD in defining a function as an input parameter to the MapPartitions function? Well, you could try to pull the `val config = job.getConfiguration()` out of the function and just use `config` inside the function, hoping that this one is serializable. Tobias
Re:Re: Serialization issue when using HBase with Spark
.map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue when using HBase with Spark
Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section setup() and cleanup() in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com: In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue when using HBase with Spark
The reason not using sc.newAPIHadoopRDD is it only support one scan each time. I am not sure is that's true. You can use multiple scans as following: val scanStrings = scans.map(scan = convertScanToString(scan)) conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*) where convertScanToString is implemented as: /** * Serializes a HBase scan into string. * @param scan Scan to serialize. * @return Base64 encoded serialized scan. */ private def convertScanToString(scan: Scan) = { val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } Thanks, Aniket On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu zsxw...@gmail.com wrote: Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section setup() and cleanup() in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com: In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue when using HBase with Spark
In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Serialization issue when using HBase with Spark
The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue when using HBase with Spark
Can you paste the complete code? it looks like at some point you are passing a hadoop's configuration which is not Serializable. You can look at this thread for similar discussion http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-td13378.html Thanks Best Regards On Fri, Dec 12, 2014 at 2:05 PM, yangliuyu yangli...@163.com wrote: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
serialization issue in case of case class is more than 1
Hi, I am newbie in Spark and performed following steps during POC execution: 1. Map csv file to object-file after some transformations once. 2. Serialize object-file to RDD for operation, as per need. In case of 2 csv/object-files, first object-file is serialized to RDD successfully but during serialization of second object-file error appears. This error occurs only when spark-shell is restarted between step-1 and step-2. Please suggest how to serialize 2 object-files. Also find below executed code on spark-shell *** //#1//Start spark-shell and csv to object-file creation val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class person(id: Int, name: String, fathername: String, officeid: Int) val baseperson = sc.textFile(person_csv).flatMap(line = line.split(\n)).map(_.split(,)) baseperson.map(p = person(p(0).trim.toInt, p(1), p(2), p(3).trim.toInt)).saveAsObjectFile(person_obj) case class office(id: Int, name: String, landmark: String, areacode: String) val baseoffice = sc.textFile(office_csv).flatMap(line = line.split(\n)).map(_.split(,)) baseoffice.map(p = office(p(0).trim.toInt, p(1), p(2), p(3))).saveAsObjectFile(office_obj) //#2//Stop spark-shell //#3//Start spark-shell and map object-file val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class person(id: Int, name: String, fathername: String, officeid: Int) case class office(id: Int, name: String, landmark: String, areacode: String) sc.objectFile[person](person_obj).count [OK] sc.objectFile[office](office_obj).count *[FAILS]* *** stack trace is attached stacktrace.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n20334/stacktrace.txt rahul@... *** Regards, Rahul -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java API - Serialization Issue
This worked great. Thanks a lot -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-API-Serialization-Issue-tp1460p3178.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Java API - Serialization Issue
I am also facing the same problem. I have implemented Serializable for my code, but the exception is thrown from third party libraries on which I have no control . Exception in thread main org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: (lib class name here) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) Is it mandatory that Serializable must be implemented for dependent jars as well? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-API-Serialization-Issue-tp1460p3086.html Sent from the Apache Spark User List mailing list archive at Nabble.com.