Serialization issue when using Spark3.1.2 with hadoop yarn

2021-10-03 Thread davvy benny
My spark Job fails with this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3) (davben-lubuntu executor 2): java.lang.ClassCastException: cannot assign 
instance of java.lang.invoke.SerializedLambda to field 
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of 
org.apache.spark.rdd.MapPartitionsRDD

My OS Linux Ubuntu 20 is in this way organized: I have two user: /home/davben 
and /home/hadoop. Into hadoop user I have installed hadoop 3.1 and 
spark-3.1.2-hadoop3.2.  Both users refers to java-8-openjdk Java installation. 
The Spark job is launched from user davben on eclipse IDE  in this way: I 
create the spark conf and the spark session

System.setProperty("hadoop.home.dir", "/home/hadoop/hadoop");
SparkConf sparkConf = new SparkConf()
.setAppName("simple")
.setMaster("yarn")
.set("spark.executor.memory", "1g")
.set("deploy.mode", "cluster")
.set("spark.yarn.stagingDir", "hdfs://localhost:9000/user/hadoop/") 
.set("spark.hadoop.fs.defaultFS","hdfs://localhost:9000") 
.set("spark.hadoop.yarn.resourcemanager.hostname","localhost") 
.set("spark.hadoop.yarn.resourcemanager.scheduler.address","localhost:8030") 
.set("spark.hadoop.yarn.resourcemanager.address ","localhost:8032") 
.set("spark.hadoop.yarn.resourcemanager.webapp.address","localhost:8088") 
.set("spark.hadoop.yarn.resourcemanager.admin.address","localhost:8083")
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();


My spark Job fails with this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3) (davben-lubuntu executor 2): java.lang.ClassCastException: cannot assign 
instance of java.lang.invoke.SerializedLambda to field 
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of 
org.apache.spark.rdd.MapPartitionsRDD

My OS Linux Ubuntu 20 is in this way organized: I have two user: /home/davben 
and /home/hadoop. Into hadoop user I have installed hadoop 3.1 and 
spark-3.1.2-hadoop3.2.  Both users refers to java-8-openjdk Java installation. 
The Spark job is launched from user davben on eclipse IDE  in this way: I 
create the spark conf and the spark session

System.setProperty("hadoop.home.dir", "/home/hadoop/hadoop");
SparkConf sparkConf = new SparkConf()
.setAppName("simple")
.setMaster("yarn")
.set("spark.executor.memory", "1g")
.set("deploy.mode", "cluster")
.set("spark.yarn.stagingDir", "hdfs://localhost:9000/user/hadoop/") 
.set("spark.hadoop.fs.defaultFS","hdfs://localhost:9000") 
.set("spark.hadoop.yarn.resourcemanager.hostname","localhost") 
.set("spark.hadoop.yarn.resourcemanager.scheduler.address","localhost:8030") 
.set("spark.hadoop.yarn.resourcemanager.address ","localhost:8032") 
.set("spark.hadoop.yarn.resourcemanager.webapp.address","localhost:8088") 
.set("spark.hadoop.yarn.resourcemanager.admin.address","localhost:8083")
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Then I create a dataset with two entries:

List rows = new ArrayList<>(); 
rows.add(RowFactory.create("a", "b"));
rows.add(RowFactory.create("a", "a"));
StructType structType = new StructType(); 
structType = structType.add("edge_1", DataTypes.StringType, false);
structType = structType.add("edge_2", DataTypes.StringType, false); 
ExpressionEncoder edgeEncoder = RowEncoder.apply(structType);
Dataset edge = spark.createDataset(rows, edgeEncoder);


Then I print the content of the current dataset edge

 edge.show();
Then I perform a map transformation on edge that upper cases the values of the 
two entries and return the result in edge2

Dataset edge2 = edge.map(new MyFunction2(), edgeEncoder); The following is the 
code of MyFunction2

public class MyFunction2 implements MapFunction, scala.Serializable { 
private static final long serialVersionUID = 1L;

@Override public Row call(Row v1) throws Exception { 
String el1 = v1.get(0).toString().toUpperCase(); 
String el2 = v1.get(1).toString().toUpperCase(); 
return RowFactory.create(el1,el2); 
}
}
Finally I show the content of edge2

edge2.show();
I can confirm that, checking on the hadoop UI a localhost:8088, the job is 
submitted correctly, and what sounds strange is that the first show is returned 
correctly in my console, but the second one fails returning the up mentioned 
error.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



BiMap BroadCast Variable - Kryo Serialization Issue

2016-11-02 Thread Kalpana Jalawadi
Hi,

I am getting Nullpointer exception due to Kryo Serialization issue, while
trying to read a BiMap broadcast variable. Attached is the code snippets.
Pointers shared here didn't help - link1
<http://stackoverflow.com/questions/33156095/spark-serialization-issue-with-hashmap>,
link2
<http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist>.
Spark version used is 1.6.x, but this was working with 1.3.x version.

Any help in this regard is much appreciated.

Exception:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1238)
App > at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
App > at
com.manthan.aaas.algo.associationmining.impl.Test.lambda$execute$6abf5fd0$1(Test.java:39)
App > at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
App > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
App > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
App > at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
App > at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
App > at scala.collection.AbstractIterator.to(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
App > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
App > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
App > at org.apache.spark.scheduler.Task.run(Task.scala:89)
App > at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
App > at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
App > at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
App > at java.lang.Thread.run(Thread.java:745)
App > Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
App > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
App > at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
App > at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
App > at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
App > ... 29 more
App > Caused by: java.lang.NullPointerException
App > at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
App > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
App > ... 35 more
App >
App > 16/11/02 18:39:01 dispatcher-event-loop-2 INFO TaskSetManager:
Starting task 17.1 in stage 1.0 (TID 19, ip-10-0-1-237.ec2.internal,
partition 17,PROCESS_LOCAL, 2076 bytes)
App > 16/11/02 18:39:01 task-result-getter-3 INFO TaskSetManager: Lost task
17.1 in stage

Spark, Kryo Serialization Issue with ProtoBuf field

2016-07-13 Thread Nkechi Achara
Hi,

I am seeing an error when running my spark job relating to Serialization of
a protobuf field when transforming an RDD.

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException Serialization trace: otherAuthors_
(com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)

The error seems to be created at this point:

val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {

  tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b)
&& !isBookPublished(o)).mapPartitions( it =>

  it.map{ord =>

(ord.getAuthor, ord.getPublisherName,
getGenre(ord.getSourceCountry))}))

}


val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>

  opt.map(o => (tier, o._1, PublisherCompanyComparison,
o._3)).countByValue()

}


val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>

  opt.map(o => (tier, o._1, PublisherComparison(o._2),
o._3)).countByValue()

}

The field is a list specified in the protobuf as the below:

otherAuthors_ = java.util.Collections.emptyList()

As you can see the code is not actually utilising that field from the Book
Protobuf, although it still is being transmitted over the network.

Has anyone got any advice on this?

Thanks,

K


Re: Serialization issue with Spark

2016-03-25 Thread manasdebashiskar
You have not mentioned what task is not serializable.
The stack trace is usually a good idea while asking this question.

Usually spark will tell you what class it is not able to serialize. 
If it is one of your own class then try making it serializable or make it
transient so that it only gets created on the executor.

...Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26595.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization issue with Spark

2016-03-23 Thread Dirceu Semighini Filho
Hello Hafsa,
TaskNotSerialized exception usually means that you are trying to use an
object, defined in the driver, in code that runs on workers.
Can you post the code that ir generating this error here, so we can better
advise you?

Cheers.

2016-03-23 14:14 GMT-03:00 Hafsa Asif <hafsa.a...@matchinguu.com>:

> Can anyone please help me in this issue?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26579.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Serialization issue with Spark

2016-03-23 Thread Hafsa Asif
Can anyone please help me in this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization issue with Spark

2016-03-22 Thread Ted Yu
Can you show code snippet and the exception for 'Task is not serializable' ?

Please see related JIRA:
  SPARK-10251
whose pull request contains code for registering classes with Kryo.

Cheers

On Tue, Mar 22, 2016 at 7:00 AM, Hafsa Asif <hafsa.a...@matchinguu.com>
wrote:

> Hello,
> I am facing Spark serialization issue in Spark (1.4.1 - Java Client) with
> Spring Framework. It is known that Spark needs serialization and it
> requires
> every class need to be implemented with java.io.Serializable. But, in the
> documentation link: http://spark.apache.org/docs/latest/tuning.html, it is
> mentioned that it is not a good approach and better to use Kryo.
> I am using Kryo in Spark configuration like this:
>   public @Bean DeepSparkContext sparkContext(){
> DeepSparkConfig conf = new DeepSparkConfig();
> conf.setAppName(this.environment.getProperty("APP_NAME"))
> .setMaster(master)
> .set("spark.executor.memory",
> this.environment.getProperty("SPARK_EXECUTOR_MEMORY"))
> .set("spark.cores.max",
> this.environment.getProperty("SPARK_CORES_MAX"))
> .set("spark.default.parallelism",
> this.environment.getProperty("SPARK_DEFAULT_PARALLELISM"));
> conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> return new DeepSparkContext(conf);
> }
>
> but still getting exception in Spark that 'Task is not serializable'. I
> also
> donot want to make spark contect 'static'.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Serialization issue with Spark

2016-03-22 Thread Hafsa Asif
Hello,
I am facing Spark serialization issue in Spark (1.4.1 - Java Client) with
Spring Framework. It is known that Spark needs serialization and it requires
every class need to be implemented with java.io.Serializable. But, in the
documentation link: http://spark.apache.org/docs/latest/tuning.html, it is
mentioned that it is not a good approach and better to use Kryo.
I am using Kryo in Spark configuration like this:
  public @Bean DeepSparkContext sparkContext(){
DeepSparkConfig conf = new DeepSparkConfig();
conf.setAppName(this.environment.getProperty("APP_NAME"))
.setMaster(master)
.set("spark.executor.memory",
this.environment.getProperty("SPARK_EXECUTOR_MEMORY"))
.set("spark.cores.max",
this.environment.getProperty("SPARK_CORES_MAX"))
.set("spark.default.parallelism",
this.environment.getProperty("SPARK_DEFAULT_PARALLELISM"));
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
return new DeepSparkContext(conf);
}

but still getting exception in Spark that 'Task is not serializable'. I also
donot want to make spark contect 'static'.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.

2015-08-21 Thread Eugene Morozov
Hi,

I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm
trying to save my data frame to parquet.
The issue I'm stuck looks like serialization tries to do pretty weird
thing: tries to write to an empty array.

The last (through stack trace) line of spark code that leads to exception
is in method SerializationDebugger.visitSerializable(o: Object, stack:
List[String]): List[String].
desc.getObjFieldValues(finalObj, objFieldValues)

The reason it does so, is because finalObj is
org.apache.spark.sql.execution.Project and objFieldValues is an empty
array! As a result there are two fields to read from the Project instance
object (happens in java.io.ObjectStreamClass), but there is an empty array
to read into.

A little bit of code with debug info:
private def visitSerializable(o: Object, stack: List[String]): List[String]
= {
val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project,
desc: org.apache.spark.sql.execution.Project
  val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0:
SparkPlan, 1: Project]
  var i = 0 //i: 0
  while (i  slotDescs.length) {
val slotDesc = slotDescs(i) //slotDesc:
org.apache.spark.sql.execution.SparkPlan
if (slotDesc.hasWriteObjectMethod) {
  // TODO: Handle classes that specify writeObject method.
} else {
  val fields: Array[ObjectStreamField] = slotDesc.getFields
//fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled]
  val objFieldValues: Array[Object] = new
Array[Object](slotDesc.getNumObjFields) //objFieldValues:
java.lang.Object[0]
  val numPrims = fields.length - objFieldValues.length //numPrims: 1
  desc.getObjFieldValues(finalObj, objFieldValues) //leads to
exception

So it looks like it gets objFieldValues array from the SparkPlan object,
but uses it to receive values from Project object.

Here is the schema of my data frame
root
 |-- Id: long (nullable = true)
 |-- explodes: struct (nullable = true)
 ||-- Identifiers: array (nullable = true)
 |||-- element: struct (containsNull = true)
 ||||-- Type: array (nullable = true)
 |||||-- element: string (containsNull = true)
 |-- Identifiers: struct (nullable = true)
 ||-- Type: array (nullable = true)
 |||-- element: string (containsNull = true)
 |-- Type2: string (nullable = true)
 |-- Type: string (nullable = true)

Actual stack trace is:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
at
com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at

Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.

2015-08-21 Thread Reynold Xin
You've probably hit this bug:
https://issues.apache.org/jira/browse/SPARK-7180

It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to
false and see if it goes away.


On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov evgeny.a.moro...@gmail.com
wrote:

 Hi,

 I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm
 trying to save my data frame to parquet.
 The issue I'm stuck looks like serialization tries to do pretty weird
 thing: tries to write to an empty array.

 The last (through stack trace) line of spark code that leads to exception
 is in method SerializationDebugger.visitSerializable(o: Object, stack:
 List[String]): List[String].
 desc.getObjFieldValues(finalObj, objFieldValues)

 The reason it does so, is because finalObj is
 org.apache.spark.sql.execution.Project and objFieldValues is an empty
 array! As a result there are two fields to read from the Project instance
 object (happens in java.io.ObjectStreamClass), but there is an empty array
 to read into.

 A little bit of code with debug info:
 private def visitSerializable(o: Object, stack: List[String]):
 List[String] = {
 val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project,
 desc: org.apache.spark.sql.execution.Project
   val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0:
 SparkPlan, 1: Project]
   var i = 0 //i: 0
   while (i  slotDescs.length) {
 val slotDesc = slotDescs(i) //slotDesc:
 org.apache.spark.sql.execution.SparkPlan
 if (slotDesc.hasWriteObjectMethod) {
   // TODO: Handle classes that specify writeObject method.
 } else {
   val fields: Array[ObjectStreamField] = slotDesc.getFields
 //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled]
   val objFieldValues: Array[Object] = new
 Array[Object](slotDesc.getNumObjFields) //objFieldValues:
 java.lang.Object[0]
   val numPrims = fields.length - objFieldValues.length //numPrims:
 1
   desc.getObjFieldValues(finalObj, objFieldValues) //leads to
 exception

 So it looks like it gets objFieldValues array from the SparkPlan object,
 but uses it to receive values from Project object.

 Here is the schema of my data frame
 root
  |-- Id: long (nullable = true)
  |-- explodes: struct (nullable = true)
  ||-- Identifiers: array (nullable = true)
  |||-- element: struct (containsNull = true)
  ||||-- Type: array (nullable = true)
  |||||-- element: string (containsNull = true)
  |-- Identifiers: struct (nullable = true)
  ||-- Type: array (nullable = true)
  |||-- element: string (containsNull = true)
  |-- Type2: string (nullable = true)
  |-- Type: string (nullable = true)

 Actual stack trace is:
 org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
 at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
 at
 com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
 at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
 at
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
 at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
 at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
 at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
 Caused by: 

serialization issue

2015-08-13 Thread 周千昊
Hi,
I am using spark 1.4 when an issue occurs to me.
I am trying to use the aggregate function:
JavaRddString rdd = some rdd;
HashMapLong, TypeA zeroValue = new HashMap();
// add initial key-value pair for zeroValue
rdd.aggregate(zeroValue,
   new Function2HashMapLong, TypeA,
String,
HashMapLong, TypeA(){//implementation},
   new Function2HashMapLong, TypeA,
String,
HashMapLong, TypeA(){//implementation})

here is the stack trace when i run the application:

Caused by: java.lang.ClassNotFoundException: TypeA
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at java.util.HashMap.readObject(HashMap.java:1180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
at
org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
at
org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
 *however I have checked that TypeA is in the jar file which is in the
classpath*
*And when I use an empty HashMap as the zeroValue, the exception has
gone*


Re: serialization issue

2015-08-13 Thread Anish Haldiya
While submitting the job, you can use --jars, --driver-classpath etc
configurations to add the jar. Apart from that if you are running the
job as a standalone application, then you can use the sc.addJar option
to add the jar (which will ship this jar into all the executors)

Regards,

Anish


On 8/13/15, 周千昊 qhz...@apache.org wrote:
 Hi,
 I am using spark 1.4 when an issue occurs to me.
 I am trying to use the aggregate function:
 JavaRddString rdd = some rdd;
 HashMapLong, TypeA zeroValue = new HashMap();
 // add initial key-value pair for zeroValue
 rdd.aggregate(zeroValue,
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation},
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation})

 here is the stack trace when i run the application:

 Caused by: java.lang.ClassNotFoundException: TypeA
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at java.util.HashMap.readObject(HashMap.java:1180)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
 at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
 at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
 at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
 at
 org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
 at
 org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
  *however I have checked that TypeA is in the jar file which is in the
 classpath*
 *And when I use an empty HashMap as the zeroValue, the exception has
 gone*


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization issue with mapPartitions

2014-12-26 Thread Akhil
You cannot pass your jobConf object inside any of the transformation function
in spark (like map, mapPartitions, etc.) since 
 org.apache.hadoop.mapreduce.Job is not Serializable. You can use
KryoSerializer (See this doc
http://spark.apache.org/docs/latest/tuning.html#data-serialization), We
usually converts the JobConf into ByteArray and pass over the byteArray
object inside the map and from there we creates the jobConf (new variable)
with the data inside byteArray object.


 I should rephrase my question as follows:
 
 How to use the corresponding Hadoop Configuration of a HadoopRDD in
 defining a function as an input parameter to the MapPartitions function?
 
 Thanks.
 
 Ey-Chih Chow





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



serialization issue with mapPartitions

2014-12-25 Thread ey-chih chow
Hi,

I got some issues with mapPartitions with the following piece of code:

val sessions = sc
  .newAPIHadoopFile(
... path to an avro file ...,
classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]],
classOf[AvroKey[ByteBuffer]],
classOf[NullWritable],
job.getConfiguration())
  .mapPartitions { valueIterator =
val config = job.getConfiguration()
 .
 .
 .
  }
  .collect()

Why job.getConfiguration() in the function mapPartitions will generate the
following message?

Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job

If I take out 'val config = job.getConfiguration()' in the mapPartitions,
the code works fine, even through 
job.getConfiguration() shows up also in newAPIHadoopFile().

Ey-Chih Chow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization issue with mapPartitions

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 1:32 AM, ey-chih chow eyc...@hotmail.com wrote:

 I got some issues with mapPartitions with the following piece of code:

 val sessions = sc
   .newAPIHadoopFile(
 ... path to an avro file ...,
 classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[ByteBuffer]],
 classOf[AvroKey[ByteBuffer]],
 classOf[NullWritable],
 job.getConfiguration())
   .mapPartitions { valueIterator =
 val config = job.getConfiguration()
  .
   }
   .collect()

 Why job.getConfiguration() in the function mapPartitions will generate the
 following message?

 Cause: java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job


The functions inside mapPartitions() will be executed on the Spark
executors, not the Spark driver. Therefore, the function body needs to be
serialized and sent to the executors via network. If that is not possible
(in your case, `job` cannot be serialized), you will get a
NotSerializableException. It works inside newAPIHadoopFile because this is
executed on the driver.

Tobias


Re: serialization issue with mapPartitions

2014-12-25 Thread ey-chih chow
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Thanks.

Ey-Chih Chow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-with-mapPartitions-tp20858p20861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: serialization issue with mapPartitions

2014-12-25 Thread Tobias Pfeiffer
Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow eyc...@hotmail.com wrote:

 I should rephrase my question as follows:

 How to use the corresponding Hadoop Configuration of a HadoopRDD in
 defining
 a function as an input parameter to the MapPartitions function?


Well, you could try to pull the `val config = job.getConfiguration()` out
of the function and just use `config` inside the function, hoping that this
one is serializable.

Tobias


RE: serialization issue with mapPartitions

2014-12-25 Thread Shao, Saisai
Hi,

Hadoop Configuration is only Writable, not Java Serializable. You can use 
SerializableWritable (in Spark) to wrap the Configuration to make it 
serializable, and use broadcast variable to broadcast this conf to all the 
node, then you can use it in mapPartitions, rather than  serialize it within 
closure.

You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage 
scenario like yours.

Thanks
Jerry.

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Friday, December 26, 2014 9:38 AM
To: ey-chih chow
Cc: user
Subject: Re: serialization issue with mapPartitions

Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow 
eyc...@hotmail.commailto:eyc...@hotmail.com wrote:
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Well, you could try to pull the `val config = job.getConfiguration()` out of 
the function and just use `config` inside the function, hoping that this one is 
serializable.

Tobias




Re:Re: Serialization issue when using HBase with Spark

2014-12-23 Thread yangliuyu
  .map { case (deviceId, uid) =
 uid}.distinct().sortBy(x=x).mapPartitions(iterator={
  val conf = HBaseConfiguration.create()
  val table = new HTable(conf, actions)
  val result = iterator.map{ userId=
(userId, getUserActions(table, userId, timeStart, timeStop))
  }
  table.close()
  result
})

 But got the exception:
 org.apache.spark.SparkException: Task not serializable
at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
 ...
 Caused by: java.io.NotSerializableException:
 org.apache.hadoop.conf.Configuration
at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

 The reason not using sc.newAPIHadoopRDD is it only support one scan each
 time.
 val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

 And if using MultiTableInputFormat, driver is not possible put all rowkeys
 into HBaseConfiguration
 Option 2:
 sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

 It may divide all rowkey ranges into several parts then use option 2, but I
 prefer option 1. So is there any solution for option 1?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization issue when using HBase with Spark

2014-12-15 Thread Shixiong Zhu
Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section setup() and cleanup() in Sean
Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Best Regards,
Shixiong Zhu

2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com:

 In #1, class HTable can not be serializable.
 You also need to check you self defined function getUserActions and make
 sure it is a member function of one class who implement serializable
 interface.

 发自我的 iPad

  在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道:
 
  The scenario is using HTable instance to scan multiple rowkey range in
 Spark
  tasks look likes below:
  Option 1:
  val users = input
   .map { case (deviceId, uid) =
  uid}.distinct().sortBy(x=x).mapPartitions(iterator={
   val conf = HBaseConfiguration.create()
   val table = new HTable(conf, actions)
   val result = iterator.map{ userId=
 (userId, getUserActions(table, userId, timeStart, timeStop))
   }
   table.close()
   result
 })
 
  But got the exception:
  org.apache.spark.SparkException: Task not serializable
 at
 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
  ...
  Caused by: java.io.NotSerializableException:
  org.apache.hadoop.conf.Configuration
 at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 
  The reason not using sc.newAPIHadoopRDD is it only support one scan each
  time.
  val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  And if using MultiTableInputFormat, driver is not possible put all
 rowkeys
  into HBaseConfiguration
  Option 2:
  sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  It may divide all rowkey ranges into several parts then use option 2,
 but I
  prefer option 1. So is there any solution for option 1?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Serialization issue when using HBase with Spark

2014-12-15 Thread Aniket Bhatnagar
The reason not using sc.newAPIHadoopRDD is it only support one scan each
time.

I am not sure is that's true. You can use multiple scans as following:

val scanStrings = scans.map(scan = convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)

where convertScanToString is implemented as:

/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}

Thanks,
Aniket

On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu zsxw...@gmail.com wrote:

 Just point out a bug in your codes. You should not use `mapPartitions`
 like that. For details, I recommend Section setup() and cleanup() in Sean
 Owen's post:
 http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

 Best Regards,
 Shixiong Zhu

 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com:

 In #1, class HTable can not be serializable.
 You also need to check you self defined function getUserActions and make
 sure it is a member function of one class who implement serializable
 interface.

 发自我的 iPad

  在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道:
 
  The scenario is using HTable instance to scan multiple rowkey range in
 Spark
  tasks look likes below:
  Option 1:
  val users = input
   .map { case (deviceId, uid) =
  uid}.distinct().sortBy(x=x).mapPartitions(iterator={
   val conf = HBaseConfiguration.create()
   val table = new HTable(conf, actions)
   val result = iterator.map{ userId=
 (userId, getUserActions(table, userId, timeStart, timeStop))
   }
   table.close()
   result
 })
 
  But got the exception:
  org.apache.spark.SparkException: Task not serializable
 at
 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
  ...
  Caused by: java.io.NotSerializableException:
  org.apache.hadoop.conf.Configuration
 at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 
  The reason not using sc.newAPIHadoopRDD is it only support one scan each
  time.
  val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  And if using MultiTableInputFormat, driver is not possible put all
 rowkeys
  into HBaseConfiguration
  Option 2:
  sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  It may divide all rowkey ranges into several parts then use option 2,
 but I
  prefer option 1. So is there any solution for option 1?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Serialization issue when using HBase with Spark

2014-12-14 Thread Yanbo
In #1, class HTable can not be serializable.
You also need to check you self defined function getUserActions and make sure 
it is a member function of one class who implement serializable interface.

发自我的 iPad

 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道:
 
 The scenario is using HTable instance to scan multiple rowkey range in Spark
 tasks look likes below:
 Option 1:
 val users = input
  .map { case (deviceId, uid) =
 uid}.distinct().sortBy(x=x).mapPartitions(iterator={
  val conf = HBaseConfiguration.create()
  val table = new HTable(conf, actions)
  val result = iterator.map{ userId=
(userId, getUserActions(table, userId, timeStart, timeStop))
  }
  table.close()
  result
})
 
 But got the exception:
 org.apache.spark.SparkException: Task not serializable
at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
 ...
 Caused by: java.io.NotSerializableException:
 org.apache.hadoop.conf.Configuration
at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 
 The reason not using sc.newAPIHadoopRDD is it only support one scan each
 time.
 val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result]) 
 
 And if using MultiTableInputFormat, driver is not possible put all rowkeys
 into HBaseConfiguration
 Option 2:
 sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
 
 It may divide all rowkey ranges into several parts then use option 2, but I
 prefer option 1. So is there any solution for option 1? 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Serialization issue when using HBase with Spark

2014-12-12 Thread yangliuyu
The scenario is using HTable instance to scan multiple rowkey range in Spark
tasks look likes below:
Option 1:
val users = input
  .map { case (deviceId, uid) =
uid}.distinct().sortBy(x=x).mapPartitions(iterator={
  val conf = HBaseConfiguration.create()
  val table = new HTable(conf, actions)
  val result = iterator.map{ userId=
(userId, getUserActions(table, userId, timeStart, timeStop))
  }
  table.close()
  result
})

But got the exception:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
...
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

The reason not using sc.newAPIHadoopRDD is it only support one scan each
time.
val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result]) 

And if using MultiTableInputFormat, driver is not possible put all rowkeys
into HBaseConfiguration
Option 2:
sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

It may divide all rowkey ranges into several parts then use option 2, but I
prefer option 1. So is there any solution for option 1? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialization issue when using HBase with Spark

2014-12-12 Thread Akhil Das
Can you paste the complete code? it looks like at some point you are
passing a hadoop's configuration which is not Serializable. You can look at
this thread for similar discussion
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-td13378.html

Thanks
Best Regards

On Fri, Dec 12, 2014 at 2:05 PM, yangliuyu yangli...@163.com wrote:

 The scenario is using HTable instance to scan multiple rowkey range in
 Spark
 tasks look likes below:
 Option 1:
 val users = input
   .map { case (deviceId, uid) =
 uid}.distinct().sortBy(x=x).mapPartitions(iterator={
   val conf = HBaseConfiguration.create()
   val table = new HTable(conf, actions)
   val result = iterator.map{ userId=
 (userId, getUserActions(table, userId, timeStart, timeStop))
   }
   table.close()
   result
 })

 But got the exception:
 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at

 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
 ...
 Caused by: java.io.NotSerializableException:
 org.apache.hadoop.conf.Configuration
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

 The reason not using sc.newAPIHadoopRDD is it only support one scan each
 time.
 val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])

 And if using MultiTableInputFormat, driver is not possible put all rowkeys
 into HBaseConfiguration
 Option 2:
 sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])

 It may divide all rowkey ranges into several parts then use option 2, but I
 prefer option 1. So is there any solution for option 1?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




serialization issue in case of case class is more than 1

2014-12-03 Thread Rahul Bindlish
Hi,

I am newbie in Spark and performed following steps during POC execution:

1. Map csv file to object-file after some transformations once.
2. Serialize object-file to RDD for operation, as per need.

In case of 2 csv/object-files, first object-file is serialized to RDD
successfully but during serialization of second object-file error appears.
This error occurs only when spark-shell is restarted between step-1 and
step-2.

Please suggest how to serialize 2 object-files.

Also find below executed code on spark-shell
***
//#1//Start spark-shell and csv to object-file creation
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

case class person(id: Int, name: String, fathername: String, officeid: Int)
val baseperson = sc.textFile(person_csv).flatMap(line =
line.split(\n)).map(_.split(,))
baseperson.map(p = person(p(0).trim.toInt, p(1), p(2),
p(3).trim.toInt)).saveAsObjectFile(person_obj)

case class office(id: Int, name: String, landmark: String, areacode: String)
val baseoffice = sc.textFile(office_csv).flatMap(line =
line.split(\n)).map(_.split(,))
baseoffice.map(p = office(p(0).trim.toInt, p(1), p(2),
p(3))).saveAsObjectFile(office_obj)

//#2//Stop spark-shell
//#3//Start spark-shell and map object-file
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class person(id: Int, name: String, fathername: String, officeid: Int)
case class office(id: Int, name: String, landmark: String, areacode: String)

sc.objectFile[person](person_obj).count [OK]
sc.objectFile[office](office_obj).count *[FAILS]*
***
stack trace is attached
stacktrace.txt
http://apache-spark-user-list.1001560.n3.nabble.com/file/n20334/stacktrace.txt
 
rahul@...
***

Regards,
Rahul   







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/serialization-issue-in-case-of-case-class-is-more-than-1-tp20334.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java API - Serialization Issue

2014-03-25 Thread santhoma
This worked great. Thanks a lot



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-API-Serialization-Issue-tp1460p3178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Java API - Serialization Issue

2014-03-24 Thread santhoma
I am also facing the same problem. I have implemented Serializable for my
code, but the exception is thrown from third party libraries on which I have
no control . 

Exception in thread main org.apache.spark.SparkException: Job aborted:
Task not serializable: java.io.NotSerializableException: (lib class name
here)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

Is it mandatory that Serializable must be implemented for dependent jars as
well?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-API-Serialization-Issue-tp1460p3086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.