Sorry I've posted another solved issue to Spark groups. Below is the details.

It seems that Java Generic operation in Commons Lang problem when in Spark 
Yarn. Or java generic machenism.


The Spark code and Class deserialized code (using Apache Common Lang) like this:

val fis = spark.sparkContext.binaryFiles("/folder/abc*.file")
val RDD = fis.map(x => {
  val content = x._2.toArray()
  val b = Block.deserializeFrom(content)
  ...
}




public static Block deserializeFrom(byte[] bytes) {
    try {
        Block b = SerializationUtils.deserialize(bytes);
        System.out.println("b="+b);
        return b;
    } catch (ClassCastException e) {
        System.out.println("ClassCastException");
        e.printStackTrace();
    } catch (IllegalArgumentException e) {
        System.out.println("IllegalArgumentException");
        e.printStackTrace();

    } catch (SerializationException e) {
        System.out.println("SerializationException");
        e.printStackTrace();
    }
    return null;
}

Below is Commons Lang source code about deserialize:

public static <T> T deserialize(final byte[] objectData) {
    Validate.isTrue(objectData != null, "The byte[] must not be null");
    return deserialize(new ByteArrayInputStream(objectData));
}


public static <T> T deserialize(final InputStream inputStream) {
    Validate.isTrue(inputStream != null, "The InputStream must not be null");
    try (ObjectInputStream in = new ObjectInputStream(inputStream)) {
        @SuppressWarnings("unchecked")
        final T obj = (T) in.readObject();
        return obj;
    } catch (final ClassNotFoundException | IOException ex) {
        throw new SerializationException(ex);
    }
}

In the Spark local mode, the code runs OK. But in Cluster On Yarn mode, Spark 
code runs error like this:

org.apache.commons.lang3.SerializationException: 
java.lang.ClassNotFoundException: com.Block
    at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:227)
    at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:265)
    at com.com.XXXX.XXXX.deserializeFrom(XXX.java:81)
    at com.XXX.FFFF$$anonfun$3.apply(BXXXX.scala:157)
    at com.XXX.FFFF$$anonfun$3.apply(BXXXX.scala:153)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.Block
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:686)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at 
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:223)


From the error, we can get error happens in Apache Common Lang package at

final T obj = (T) in.readObject();

T is Block class, and when it want to transfer object to T (Block), it seems 
that it can not find Block class in JVM. so ClassNotFoundException happens.


Then I copy Lang source code and change T to Block directly, the program runs 
OK again like below:

public static Block deserializeFrom(byte[] bytes) {
        //Block b = SerializationUtils.deserialize(bytes);
        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        try (ObjectInputStream in = new ObjectInputStream(inputStream)) {
            return (Block) in.readObject();
        } catch (final ClassNotFoundException | IOException ex) {
            System.out.println("ClassNotFoundException | IOException");
            ex.printStackTrace();
        } catch (ClassCastException e) {
            System.out.println("ClassCastException");
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            System.out.println("IllegalArgumentException");
            e.printStackTrace();

        } catch (SerializationException e) {
            System.out.println("SerializationException");
            e.printStackTrace();
        }
        return null;
    }


在 2019/6/28 上午10:08, Tomo Suzuki 写道:

Glad to hear you made progress. Good luck!

(Another possibility: you might have changed the package or class name
since you saved the HDFS file.)

On Thu, Jun 27, 2019 at 21:25 big data 
<bigdatab...@outlook.com><mailto:bigdatab...@outlook.com> wrote:



Thanks. I've tried it, the new Block before it is OK.

I've solved it and posted another issue to describe this progress. The
details refer to another email:  Java Generic T makes ClassNotFoundException

在 2019/6/27 下午8:41, Tomo Suzuki 写道:

My suggestion after reading ClassNotFoundException is to try to instantiate
the class just before deserializing it:

public static Block deserializeFrom(byte[] bytes) {
    // Dummy instantiation to ensure Block class and its related classes
are available
    System.out.println("dummy = " + new Block());
    System.out.println("byte length = " + bytes.length); // Does this match
what you expect?
    try {
        Block b = SerializationUtils.deserialize(bytes);
...


Looking forward to hearing the result.



On Wed, Jun 26, 2019 at 11:03 PM big data 
<bigdatab...@outlook.com<mailto:bigdatab...@outlook.com>


<mailto:bigdatab...@outlook.com><mailto:bigdatab...@outlook.com> wrote:





The XXX Class named Block, below is part codes of it:

The deserialize code like this:

public static Block deserializeFrom(byte[] bytes) {
    try {
        Block b = SerializationUtils.deserialize(bytes);
        System.out.println("b="+b);
        return b;
    } catch (ClassCastException e) {
        System.out.println("ClassCastException");
        e.printStackTrace();
    } catch (IllegalArgumentException e) {
        System.out.println("IllegalArgumentException");
        e.printStackTrace();

    } catch (SerializationException e) {
        System.out.println("SerializationException");
        e.printStackTrace();
    }
    return null;
}


The Spark code is:

val fis = spark.sparkContext.binaryFiles("/folder/abc*.file")
val RDD = fis.map(x => {
  val content = x._2.toArray()
  val b = Block.deserializeFrom(content)
  ...
}


All codes above can run successfully in Spark local mode, but when run it
in Yarn cluster mode, the error happens.

在 2019/6/27 上午9:49, Tomo Suzuki 写道:

I'm afraid that I don't have enough information to troubleshoot problem in
com.XXX.XXX. It would be great if you can create a minimal example project
that can reproduce the same issue.

Regards,
Tomo

On Wed, Jun 26, 2019 at 9:20 PM big data 
<bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:
bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:
bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:bigdatab...@outlook.com>
 wrote:



Hi,

Actually, the class com.XXX.XXX is normally called in the before spark
code, and this exception error is happened in one static method of this
class.

So the jar dependency problem can be excluded.

在 2019/6/26 下午10:23, Tomo Suzuki 写道:


Hi Big data,

I don't use SerializationUtils, but if I interpret the error message:

   ClassNotFoundException: com.XXXX.XXXX

, this says com.XXXX.XXXX is not available in the class path of JVM


(which


your Spark is running on). I would verify that you can instantiate
com.XXXX.XXXX in Spark/Scala *without* SerializationUtils.

Regards,
Tomo



On Wed, Jun 26, 2019 at 4:12 AM big data 
<bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:
bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:
bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:bigdatab...@outlook.com><mailto:bigdatab...@outlook.com>


wrote:





I use Apache Commons Lang3's SerializationUtils in the code.

SerializationUtils.serialize()

to store a customized class as files into disk and

SerializationUtils.deserialize(byte[])

to restore them again.

In the local environment (Mac OS), all serialized files can be
deserialized normally and no error happens. But when I copy these
serialized files into HDFS, and read them from HDFS by using


Spark/Scala, a


SerializeException happens.

The Apache Commons Lang3 version is:

     <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-lang3</artifactId>
         <version>3.9</version>
     </dependency>


the stack error as below:

org.apache.commons.lang3.SerializationException:
java.lang.ClassNotFoundException: com.XXXX.XXXX
     at





org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:227)


     at





org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:265)


     at com.com.XXXX.XXXX.deserializeFrom(XXX.java:81)
     at com.XXX.FFFF$$anonfun$3.apply(BXXXX.scala:157)
     at com.XXX.FFFF$$anonfun$3.apply(BXXXX.scala:153)
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
     at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
     at



scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)


     at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
     at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:310)
     at scala.collection.AbstractIterator.to(Iterator.scala:1336)
     at



scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)


     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
     at



scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)


     at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
     at





org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)


     at





org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)


     at





org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)


     at





org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)


     at


org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)


     at org.apache.spark.scheduler.Task.run(Task.scala:109)
     at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
     at





java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)


     at





java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)


     at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.XXXX.XXXX
     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
     at java.lang.Class.forName0(Native Method)
     at java.lang.Class.forName(Class.java:348)
     at


java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:686)


     at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
     at


java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)


     at



java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)


     at


java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)


     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
     at





org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:223)



I've check the loaded byte[]'s length, both from local and from HDFS are
same. But why it can not be deserialized from HDFS?

















--


Regards,
Tomo


Reply via email to