Re: Serialization error when using scala kernel with Jupyter
collect() returns the contents of the RDD back to the Driver in a local variable. Where is the local variable? Try val result = rdd.map(x => x + 1).collect() regards, Apostolos On 21/2/20 21:28, Nikhil Goyal wrote: Hi all, I am trying to use almond scala kernel to run spark session on Jupyter. I am using scala version 2.12.8. I am creating spark session with master set to Yarn. This is the code: val rdd = spark.sparkContext.parallelize(Seq(1, 2, 4)) rdd.map(x => x + 1).collect() Exception: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD I was wondering if anyone has seen this before. Thanks Nikhil -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Serialization error when using scala kernel with Jupyter
Hi all, I am trying to use almond scala kernel to run spark session on Jupyter. I am using scala version 2.12.8. I am creating spark session with master set to Yarn. This is the code: val rdd = spark.sparkContext.parallelize(Seq(1, 2, 4)) rdd.map(x => x + 1).collect() Exception: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD I was wondering if anyone has seen this before. Thanks Nikhil
Re: Serialization error - sql UDF related
You define "getNewColumnName" as method, which requires the class/object holding it has to be serializable. >From the stack trace, it looks like this method defined in >ProductDimensionSFFConverterRealApp, but it is not serializable. In fact, your method only uses String and Boolean, which are serializable by default. So you can change the definition to function, instead of method, which should work. Yong From: Darshan Pandya <darshanpan...@gmail.com> Sent: Friday, February 17, 2017 10:36 PM To: user Subject: Serialization error - sql UDF related Hello, I am getting the famous serialization exception on running some code as below, val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): String); val charReference: DataFrame = thinLong.select("char_name_id", "char_name").withColumn("columnNameInDimTable", correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", lit(dimension).cast(StringType)).distinct(); val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference""" val tableName: String = charReferenceTableName.toString charReference.saveAsTable(tableName, saveMode) I think it has something to do with the UDF, so I am pasting the UDF function as well def getNewColumnName(oldColName: String, appendID: Boolean): String = { var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", "_pct").replaceAllLiterally("#", "No") return newColName; } Exception seen is Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2066) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 73 more Caused by: java.io.NotSerializableException: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$ Serialization stack: - object not serializable (class: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$, value: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411) - field (class: com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, name: $outer, type: interface com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits) - object (class com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(char_name#3)) - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(char_name#3) AS columnNameInDimTable#304) - element of array (index: 2) - array (class [Ljava.lang.Object;, size 4) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS columnNameInDimTable#304, PRODUCT AS applicable_dimension#305)) - field (class: org.apache.spark.sql.execution.Project, name: projectList, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.Project, Project [char_name_id#2,char_name#3,UDF(char_name#3) AS columnNameInDimTable#304,PRODUCT AS applicable_dimension#305] -- Sincerely, Darshan
Re: Serialization error - sql UDF related
Hi Darshan , When you get org.apache.spark.SparkException: Task not serializable exception, it means that you are using a reference to an instance of a non-serialize class inside a transformation. Hope following link will help. https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html Regards, Vaquar khan On Fri, Feb 17, 2017 at 9:36 PM, Darshan Pandyawrote: > Hello, > > I am getting the famous serialization exception on running some code as > below, > > val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): > String); > val charReference: DataFrame = thinLong.select("char_name_id", > "char_name").withColumn("columnNameInDimTable", > correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", > lit(dimension).cast(StringType)).distinct(); > val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference""" > val tableName: String = charReferenceTableName.toString > charReference.saveAsTable(tableName, saveMode) > > I think it has something to do with the UDF, so I am pasting the UDF > function as well > > def getNewColumnName(oldColName: String, appendID: Boolean): String = { > var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", > "_pct").replaceAllLiterally("#", "No") > return newColName; > } > > > *Exception *seen is > > Caused by: org.apache.spark.SparkException: Task not serializable > at org.apache.spark.util.ClosureCleaner$.ensureSerializable( > ClosureCleaner.scala:304) > at org.apache.spark.util.ClosureCleaner$.org$apache$ > spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2066) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:150) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) > at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$ > doExecute$1.apply(TungstenAggregate.scala:86) > at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$ > doExecute$1.apply(TungstenAggregate.scala:80) > at org.apache.spark.sql.catalyst.errors.package$.attachTree( > package.scala:48) > ... 73 more > Caused by: java.io.NotSerializableException: com.nielsen.datamodel. > converters.cip2sff.ProductDimensionSFFConverterRealApp$ > Serialization stack: > - object not serializable (class: com.nielsen.datamodel. > converters.cip2sff.ProductDimensionSFFConverterRealApp$, value: > com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRe > alApp$@247a8411) > - field (class: com.nielsen.datamodel.converters.cip2sff. > CommonTransformationTraits$$anonfun$1, name: $outer, type: interface > com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits) > - object (class com.nielsen.datamodel.converters.cip2sff. > CommonTransformationTraits$$anonfun$1, ) > - field (class: org.apache.spark.sql.catalyst. > expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface > scala.Function1) > - object (class org.apache.spark.sql.catalyst. > expressions.ScalaUDF$$anonfun$2, ) > - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: > f, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, > UDF(char_name#3)) > - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: > child, type: class org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Alias, > UDF(char_name#3) AS columnNameInDimTable#304) > - element of array (index: 2) > - array (class [Ljava.lang.Object;, size 4) > - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: > class [Ljava.lang.Object;) > - object (class scala.collection.mutable.ArrayBuffer, > ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS > columnNameInDimTable#304, PRODUCT AS applicable_dimension#305)) > - field (class: org.apache.spark.sql.execution.Project, name: > projectList, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.execution.Project, Project > [char_name_id#2,char_name#3,UDF(char_name#3) AS > columnNameInDimTable#304,PRODUCT > AS applicable_dimension#305] > > > > -- > Sincerely, > Darshan > > -- Regards, Vaquar Khan +1 -224-436-0783 IT Architect / Lead Consultant Greater Chicago
Serialization error - sql UDF related
Hello, I am getting the famous serialization exception on running some code as below, val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): String); val charReference: DataFrame = thinLong.select("char_name_id", "char_name").withColumn("columnNameInDimTable", correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", lit(dimension).cast(StringType)).distinct(); val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference""" val tableName: String = charReferenceTableName.toString charReference.saveAsTable(tableName, saveMode) I think it has something to do with the UDF, so I am pasting the UDF function as well def getNewColumnName(oldColName: String, appendID: Boolean): String = { var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", "_pct").replaceAllLiterally("#", "No") return newColName; } *Exception *seen is Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2066) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 73 more Caused by: java.io.NotSerializableException: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$ Serialization stack: - object not serializable (class: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$, value: com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411) - field (class: com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, name: $outer, type: interface com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits) - object (class com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(char_name#3)) - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(char_name#3) AS columnNameInDimTable#304) - element of array (index: 2) - array (class [Ljava.lang.Object;, size 4) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS columnNameInDimTable#304, PRODUCT AS applicable_dimension#305)) - field (class: org.apache.spark.sql.execution.Project, name: projectList, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.Project, Project [char_name_id#2,char_name#3,UDF(char_name#3) AS columnNameInDimTable#304,PRODUCT AS applicable_dimension#305] -- Sincerely, Darshan
serialization error
Hi having some problems with the piece of code I inherited: the error messages i get are: the code runs if i exclude the following line: any help appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/serialization-error-tp25131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: serialization error
Attachments didn't go through. Mind using pastebin to show the code / error ? Thanks On Mon, Oct 19, 2015 at 3:01 PM, daze5112 <david.zeel...@ato.gov.au> wrote: > Hi having some problems with the piece of code I inherited: > > > > > the error messages i get are: > > > the code runs if i exclude the following line: > > > any help appreciated. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/serialization-error-tp25131.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: serialization error
That particular line used an object which did not implement "Serializable" On Tue, Oct 20, 2015 at 9:01 AM, daze5112 <david.zeel...@ato.gov.au> wrote: > Hi having some problems with the piece of code I inherited: > > > > > the error messages i get are: > > > the code runs if i exclude the following line: > > > any help appreciated. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/serialization-error-tp25131.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 | f: 02 9376 0730| m: 0433221979
Serialization Error with PartialFunction / immutable sets
Hi, I’m receiving a task not serializable exception using Spark GraphX (Scala 2.11.6 / JDK 1.8 / Spark 1.5) My vertex data is of type (VertexId, immutable Set), My edge data is of type PartialFunction[ISet[E], ISet[E]] where each ED has a precomputed function. My vertex program: val vertexProgram = (id: VertexId, currentSet: ISet[E], inSet: ISet[E]) => inSet (identity) My send message: val sendMessage: (EdgeTriplet[ISet[E], MonotonicTransferFunction]) => Iterator[(VertexId, ISet[E])] = (edge) => { val f = edge.attr val currentSet = edge.srcAttr Iterator((edge.dstId, f(currentSet))) } My message combiner val messageCombiner: (ISet[E], ISet[E]) => ISet[E] = (a, b) => a ++ b g.pregel(bottom, Int.MaxValue, EdgeDirection.Out)(vp, send, combine) I debugged the pregel execution and found that the exception happened when pregel calls mapReduceTriplets to aggregate the messages for the first time. This happens after the initial vertex program is run I believe (which does not cause an exception). I think the error lies within my send/combiner functions but I am not sure. I’ve also tried storing the PartialFunctions inside of the VD instead and still get the same error. At first I thought the error might have to do with Set and how it changes size throughout execution, but I have successfully ran other Pregel projects using immutable sets without issue… I have also tried enclosing each method within its own class that extends Serializable but this still gives me the same error. Thank you for your time and information. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Task serialization error for mllib.MovieLensALS
I run the MovieLensALS, but meet the following error. The weird thing is that this issue only appear under openjdk. And this is based on the 1.5, I found several related tickets, not sure has anyone else meet the same issue and know the solution ? Thanks https://issues.apache.org/jira/browse/SPARK-4672 https://issues.apache.org/jira/browse/SPARK-4838 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841) java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) scala.collection.immutable.$colon$colon.writeObject(List.scala:379) -- Best Regards Jeff Zhang
Re: Task Serialization Error on DataFrame.foreachPartition
private HTable table; You should declare table variable within apply() method. BTW which hbase release are you using ? I see you implement caching yourself. You can make use of the following HTable method: public void setWriteBufferSize(long writeBufferSize) throws IOException { Cheers On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel nishant.k.pa...@gmail.com wrote: Hi, Please find code as below. dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { private HTable table; private char ROWKEY_SEPERATOR = '\u'; public BoxedUnit apply(scala.collection.IteratorRow rows) { Configuration config = HBaseConfiguration.create(); config.set( hbase.zookeeper.quorum, ); config.set(hbase.zookeeper.property.clientPort, ???); config.set(zookeeper.znode.parent, ); try { table = new HTable(config, table_name); } catch (Exception e) { throw new RuntimeException(e); } ListPut puts = new ArrayListPut(); try { while (rows.hasNext()) { Row row = rows.next(); MapString, Object map = new HashMapString, Object(); String[] fieldNames = row.schema().fieldNames(); for (int i = 0; i fieldNames.length; i++) { map.put(fieldNames[i].toUpperCase(), row.get(i)); } puts.add(mapToPut(map)); if (puts.size() = 500) { table.put(puts); puts.clear(); } } table.put(puts); } catch (Exception e) { throw new RuntimeException(e); } return BoxedUnit.UNIT; } private Put mapToPut(MapString, Object map) throws IOException { try { Put put = new Put(getRowKey(map)); String value = null; for (String key : map.keySet()) { value = (map.get(key) == null ? : map.get(key).toString()); put.add(Bytes.toBytes(0), Bytes.toBytes(key), Bytes.toBytes(value)); } return put; } catch (Exception e) { e.printStackTrace(); throw e; } } private byte[] getRowKey(MapString, Object map) { StringBuilder builder = new StringBuilder(); return Bytes.toBytes(builder.toString()); } }); Regards, Nishant On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu yuzhih...@gmail.com wrote: Can you show us the code for loading Hive into hbase ? There shouldn't be 'return' statement in that code. Cheers On Jun 20, 2015, at 10:10 PM, Nishant Patel nishant.k.pa...@gmail.com wrote: Hi, I am loading data from Hive table to Hbase after doing some manipulation. I am getting error as 'Task not Serializable'. My code is as below. public class HiveToHbaseLoader implements Serializable { public static void main(String[] args) throws Exception { String hbaseTableName = args[0]; String hiveQuery = args[1]; SparkConf conf = new SparkConf().setAppName(Hive to Hbase Loader) .setMaster(); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); hiveContext.setConf(hive.metastore.uris, ?); DataFrame dataFrame = hiveContext.sql(hiveQuery); dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { //Logic to load row from hive to Hbase. } }} Getting error as below. Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at
Re: Task Serialization Error on DataFrame.foreachPartition
Can you show us the code for loading Hive into hbase ? There shouldn't be 'return' statement in that code. Cheers On Jun 20, 2015, at 10:10 PM, Nishant Patel nishant.k.pa...@gmail.com wrote: Hi, I am loading data from Hive table to Hbase after doing some manipulation. I am getting error as 'Task not Serializable'. My code is as below. public class HiveToHbaseLoader implements Serializable { public static void main(String[] args) throws Exception { String hbaseTableName = args[0]; String hiveQuery = args[1]; SparkConf conf = new SparkConf().setAppName(Hive to Hbase Loader) .setMaster(); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); hiveContext.setConf(hive.metastore.uris, ?); DataFrame dataFrame = hiveContext.sql(hiveQuery); dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { //Logic to load row from hive to Hbase. } }} Getting error as below. Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805) at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875) at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46) Caused by: java.io.NotSerializableException: com.philips.bda.HiveToHbaseLoader$1 Serialization stack: - object not serializable (class: com.philips.bda.HiveToHbaseLoader$1, value: function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 5 more -- Regards, Nishant
Task Serialization Error on DataFrame.foreachPartition
Hi, I am loading data from Hive table to Hbase after doing some manipulation. I am getting error as 'Task not Serializable'. My code is as below. public class HiveToHbaseLoader implements Serializable { public static void main(String[] args) throws Exception { String hbaseTableName = args[0]; String hiveQuery = args[1]; SparkConf conf = new SparkConf().setAppName(Hive to Hbase Loader) .setMaster(); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); hiveContext.setConf(hive.metastore.uris, ?); DataFrame dataFrame = hiveContext.sql(hiveQuery); dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { //Logic to load row from hive to Hbase. } }} Getting error as below. Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805) at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875) at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46) Caused by: java.io.NotSerializableException: com.philips.bda.HiveToHbaseLoader$1 Serialization stack: - object not serializable (class: com.philips.bda.HiveToHbaseLoader$1, value: function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 5 more -- Regards, Nishant
Re: Serialization error
arguments are values of it. The name of the argument is important and all you need to do is specify those when your creating SparkConf object. Glad it worked. On Tue, Apr 28, 2015 at 5:20 PM, madhvi madhvi.gu...@orkash.com wrote: Thankyou Deepak.It worked. Madhvi On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize ).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get( maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize ).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak -- Deepak
Re: Serialization error
Thankyou Deepak.It worked. Madhvi On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Deepak
Re: Serialization error
val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize ).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get( maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak
Re: Serialization error
On Tuesday 28 April 2015 01:39 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: val conf = new SparkConf() .setAppName(detail) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key])) Can you try this ? On Tue, Apr 28, 2015 at 11:11 AM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Deepak Hi Deepak, The snippet you proveide is of scala but I am working on java.I am tryng the same thing in java but please can you specify in detail what are the parameters you mentioned in that such as 'arguements'. Thanks Madhvi
Serialization error
Hi, While connecting to accumulo through spark by making sparkRDD I am getting the following error: object not serializable (class: org.apache.accumulo.core.data.Key) This is due to the 'key' class of accumulo which does not implement serializable interface.How it can be solved and accumulo can be used with spark Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org