Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-26 Thread
I have replace default java serialization with Kyro.
It indeed reduce the shuffle size and the performance has been improved,
however the shuffle speed remains unchanged.
I am quite newbie to Spark, does anyone have idea about towards which
direction I should go to find the root cause?

周千昊 <qhz...@apache.org>于2015年10月23日周五 下午5:50写道:

> We have not tried that yet, however both implementations on MR and spark
> are tested on the same amount of partition and same cluster
>
> 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:
>
>> Hi,
>>
>> Not an expert on this kind of implementation. But referring to the
>> performance result,
>>
>> if the mapside partitions fittable according to the different datasets?
>> Have you tried to
>>
>> increase the count of partitions?
>>
>>
>>
>>
>>
>> 250635...@qq.com
>>
>> From: Li Yang
>> Date: 2015-10-23 16:17
>> To: dev
>> CC: Reynold Xin; dev@spark.apache.org
>> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is
>> very slow
>> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
>> Any particular metrics or parameter to look into? Basically Spark and MR
>> shuffles the same amount of data, cause we kinda copied MR implementation
>> into Spark.
>>
>> Let us know if more info is needed.
>>
>> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
>>
>> > +kylin dev list
>> >
>> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
>> >
>> > > Hi, Reynold
>> > >   Using glom() is because it is easy to adapt to calculation logic
>> > > already implemented in MR. And o be clear, we are still in POC.
>> > >   Since the results shows there is almost no difference between
>> this
>> > > glom stage and the MR mapper, using glom here might not be the issue.
>> > >   I was trying to monitor the network traffic when repartition
>> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
>> while
>> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
>> > any
>> > > idea about it?
>> > >
>> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
>> > >
>> > >> Why do you do a glom? It seems unnecessarily expensive to materialize
>> > >> each partition in memory.
>> > >>
>> > >>
>> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
>> > >>
>> > >>> Hi, spark community
>> > >>>   I have an application which I try to migrate from MR to Spark.
>> > >>>   It will do some calculations from Hive and output to hfile
>> which
>> > >>> will be bulk load to HBase Table, details as follow:
>> > >>>
>> > >>>  Rdd input = getSourceInputFromHive()
>> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
>> mapper
>> > >>> */)
>> > >>>  // PS: the result in each partition has already been sorted
>> > >>> according to the lexicographical order during the calculation
>> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
>> > */).map(/*transform
>> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
>> > KeyValue>/*equivalent
>> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>> > >>> hfile*/)
>> > >>>
>> > >>>   This all works fine on a small dataset, and spark outruns MR
>> by
>> > >>> about 10%. However when I apply it on a dataset of 150 million
>> > records, MR
>> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
>> > >>>After exploring into the application UI, it shows that in the
>> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
>> > shuffle
>> > >>> phase a 6GB size shuffle cost about 18min which is quite
>> unreasonable
>> > >>>*Can anyone help with this issue and give me some advice on
>> > >>> this? **It’s not iterative processing, however I believe Spark
>> could be
>> > >>> the same fast at minimal.*
>> > >>>
>> > >>>   Here are the cluster info:
>> > >>>   vm: 8 nodes * (128G mem + 64 core)
>> > >>>   hadoop cluster: hdp 2.2.6
>> > >>>   spark running mode: yarn-client
>> > >>>   spark version: 1.5.1
>> > >>>
>> > >>>
>> > >>
>> >
>>
> --
Best Regard
ZhouQianhao


Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-23 Thread
We have not tried that yet, however both implementations on MR and spark
are tested on the same amount of partition and same cluster

250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道:

> Hi,
>
> Not an expert on this kind of implementation. But referring to the
> performance result,
>
> if the mapside partitions fittable according to the different datasets?
> Have you tried to
>
> increase the count of partitions?
>
>
>
>
>
> 250635...@qq.com
>
> From: Li Yang
> Date: 2015-10-23 16:17
> To: dev
> CC: Reynold Xin; dev@spark.apache.org
> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is very
> slow
> Any advise on how to tune the repartitionAndSortWithinPartitions stage?
> Any particular metrics or parameter to look into? Basically Spark and MR
> shuffles the same amount of data, cause we kinda copied MR implementation
> into Spark.
>
> Let us know if more info is needed.
>
> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:
>
> > +kylin dev list
> >
> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
> >
> > > Hi, Reynold
> > >   Using glom() is because it is easy to adapt to calculation logic
> > > already implemented in MR. And o be clear, we are still in POC.
> > >   Since the results shows there is almost no difference between
> this
> > > glom stage and the MR mapper, using glom here might not be the issue.
> > >   I was trying to monitor the network traffic when repartition
> > > happens, and it showed that the traffic peek is about 200 - 300MB/s
> while
> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
> > any
> > > idea about it?
> > >
> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
> > >
> > >> Why do you do a glom? It seems unnecessarily expensive to materialize
> > >> each partition in memory.
> > >>
> > >>
> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
> > >>
> > >>> Hi, spark community
> > >>>   I have an application which I try to migrate from MR to Spark.
> > >>>   It will do some calculations from Hive and output to hfile
> which
> > >>> will be bulk load to HBase Table, details as follow:
> > >>>
> > >>>  Rdd input = getSourceInputFromHive()
> > >>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR
> mapper
> > >>> */)
> > >>>  // PS: the result in each partition has already been sorted
> > >>> according to the lexicographical order during the calculation
> > >>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
> > >>> byte[][] which is HTable split key, equivalent to MR shuffle
> > */).map(/*transform
> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
> > KeyValue>/*equivalent
> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> > >>> hfile*/)
> > >>>
> > >>>   This all works fine on a small dataset, and spark outruns MR by
> > >>> about 10%. However when I apply it on a dataset of 150 million
> > records, MR
> > >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> > >>>After exploring into the application UI, it shows that in the
> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> > shuffle
> > >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
> > >>>*Can anyone help with this issue and give me some advice on
> > >>> this? **It’s not iterative processing, however I believe Spark could
> be
> > >>> the same fast at minimal.*
> > >>>
> > >>>   Here are the cluster info:
> > >>>   vm: 8 nodes * (128G mem + 64 core)
> > >>>   hadoop cluster: hdp 2.2.6
> > >>>   spark running mode: yarn-client
> > >>>   spark version: 1.5.1
> > >>>
> > >>>
> > >>
> >
>


repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-22 Thread
Hi, spark community
  I have an application which I try to migrate from MR to Spark.
  It will do some calculations from Hive and output to hfile which will
be bulk load to HBase Table, details as follow:

 Rdd input = getSourceInputFromHive()
 Rdd> mapSideResult =
input.glom().mapPartitions(/*some calculation, equivalent to MR mapper*/)
 // PS: the result in each partition has already been sorted according
to the lexicographical order during the calculation
 mapSideResult.repartitionAndSortWithPartitions(/*partition with
byte[][] which is HTable split key, equivalent to MR shuffle
*/).map(/*transform
Tuple2 to Tuple2/*equivalent
to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to hfile*/)

  This all works fine on a small dataset, and spark outruns MR by about
10%. However when I apply it on a dataset of 150 million records, MR is
about 100% faster than spark.(*MR 25min spark 50min*)
   After exploring into the application UI, it shows that in the
repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
phase a 6GB size shuffle cost about 18min which is quite unreasonable
   *Can anyone help with this issue and give me some advice on this? **It’s
not iterative processing, however I believe Spark could be the same fast at
minimal.*

  Here are the cluster info:
  vm: 8 nodes * (128G mem + 64 core)
  hadoop cluster: hdp 2.2.6
  spark running mode: yarn-client
  spark version: 1.5.1


Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-22 Thread
+kylin dev list

周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:

> Hi, Reynold
>   Using glom() is because it is easy to adapt to calculation logic
> already implemented in MR. And o be clear, we are still in POC.
>   Since the results shows there is almost no difference between this
> glom stage and the MR mapper, using glom here might not be the issue.
>   I was trying to monitor the network traffic when repartition
> happens, and it showed that the traffic peek is about 200 - 300MB/s while
> it stayed at speed of about 3-4MB/s for a long time. Have you guys got any
> idea about it?
>
> Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
>
>> Why do you do a glom? It seems unnecessarily expensive to materialize
>> each partition in memory.
>>
>>
>> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
>>
>>> Hi, spark community
>>>   I have an application which I try to migrate from MR to Spark.
>>>   It will do some calculations from Hive and output to hfile which
>>> will be bulk load to HBase Table, details as follow:
>>>
>>>  Rdd input = getSourceInputFromHive()
>>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper
>>> */)
>>>  // PS: the result in each partition has already been sorted
>>> according to the lexicographical order during the calculation
>>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>>> byte[][] which is HTable split key, equivalent to MR shuffle  
>>> */).map(/*transform
>>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, 
>>> KeyValue>/*equivalent
>>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>>> hfile*/)
>>>
>>>   This all works fine on a small dataset, and spark outruns MR by
>>> about 10%. However when I apply it on a dataset of 150 million records, MR
>>> is about 100% faster than spark.(*MR 25min spark 50min*)
>>>After exploring into the application UI, it shows that in the
>>> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
>>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
>>>*Can anyone help with this issue and give me some advice on
>>> this? **It’s not iterative processing, however I believe Spark could be
>>> the same fast at minimal.*
>>>
>>>   Here are the cluster info:
>>>   vm: 8 nodes * (128G mem + 64 core)
>>>   hadoop cluster: hdp 2.2.6
>>>   spark running mode: yarn-client
>>>   spark version: 1.5.1
>>>
>>>
>>


Re: repartitionAndSortWithinPartitions task shuffle phase is very slow

2015-10-22 Thread
Hi, Reynold
  Using glom() is because it is easy to adapt to calculation logic
already implemented in MR. And o be clear, we are still in POC.
  Since the results shows there is almost no difference between this
glom stage and the MR mapper, using glom here might not be the issue.
  I was trying to monitor the network traffic when repartition happens,
and it showed that the traffic peek is about 200 - 300MB/s while it stayed
at speed of about 3-4MB/s for a long time. Have you guys got any idea about
it?

Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:

> Why do you do a glom? It seems unnecessarily expensive to materialize each
> partition in memory.
>
>
> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
>
>> Hi, spark community
>>   I have an application which I try to migrate from MR to Spark.
>>   It will do some calculations from Hive and output to hfile which
>> will be bulk load to HBase Table, details as follow:
>>
>>  Rdd input = getSourceInputFromHive()
>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper*/)
>>  // PS: the result in each partition has already been sorted
>> according to the lexicographical order during the calculation
>>  mapSideResult.repartitionAndSortWithPartitions(/*partition with
>> byte[][] which is HTable split key, equivalent to MR shuffle  
>> */).map(/*transform
>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, 
>> KeyValue>/*equivalent
>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>> hfile*/)
>>
>>   This all works fine on a small dataset, and spark outruns MR by
>> about 10%. However when I apply it on a dataset of 150 million records, MR
>> is about 100% faster than spark.(*MR 25min spark 50min*)
>>After exploring into the application UI, it shows that in the
>> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
>>*Can anyone help with this issue and give me some advice on
>> this? **It’s not iterative processing, however I believe Spark could be
>> the same fast at minimal.*
>>
>>   Here are the cluster info:
>>   vm: 8 nodes * (128G mem + 64 core)
>>   hadoop cluster: hdp 2.2.6
>>   spark running mode: yarn-client
>>   spark version: 1.5.1
>>
>>
>


Re: please help with ClassNotFoundException

2015-08-14 Thread
Hi, Sea
 Problem solved, it turn out to be that I have updated spark cluster to
1.4.1, however the client has not been updated.
 Thank you so much.

Sea 261810...@qq.com于2015年8月14日周五 下午1:01写道:

 I have no idea... We use scala. You upgrade to 1.4 so quickly...,  are you
 using spark in production?  Spark 1.3 is better than spark1.4.

 -- 原始邮件 --
 *发件人:* 周千昊;z.qian...@gmail.com;
 *发送时间:* 2015年8月14日(星期五) 中午11:14
 *收件人:* Sea261810...@qq.com; dev@spark.apache.org
 dev@spark.apache.org;
 *主题:* Re: please help with ClassNotFoundException

 Hi Sea
  I have updated spark to 1.4.1, however the problem still exists, any
 idea?

 Sea 261810...@qq.com于2015年8月14日周五 上午12:36写道:

 Yes, I guess so. I see this bug before.


 -- 原始邮件 --
 *发件人:* 周千昊;z.qian...@gmail.com;
 *发送时间:* 2015年8月13日(星期四) 晚上9:30
 *收件人:* Sea261810...@qq.com; dev@spark.apache.org
 dev@spark.apache.org;
 *主题:* Re: please help with ClassNotFoundException

 Hi sea
 Is it the same issue as
 https://issues.apache.org/jira/browse/SPARK-8368

 Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道:

 Are you using 1.4.0?  If yes, use 1.4.1


 -- 原始邮件 --
 *发件人:* 周千昊;qhz...@apache.org;
 *发送时间:* 2015年8月13日(星期四) 晚上6:04
 *收件人:* devdev@spark.apache.org;
 *主题:* please help with ClassNotFoundException

 Hi,
 I am using spark 1.4 when an issue occurs to me.
 I am trying to use the aggregate function:
 JavaRddString rdd = some rdd;
 HashMapLong, TypeA zeroValue = new HashMap();
 // add initial key-value pair for zeroValue
 rdd.aggregate(zeroValue,
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation},
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation})

 here is the stack trace when i run the application:

 Caused by: java.lang.ClassNotFoundException: TypeA
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at java.util.HashMap.readObject(HashMap.java:1180)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
 at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
 at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
 at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
 at
 org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
 at
 org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
  *however I have checked that TypeA is in the jar file which is in
 the classpath*
 *And when I use an empty HashMap as the zeroValue, the exception has
 gone*
 *Does anyone meet the same problem, or can anyone help me with it?*

 --
 Best Regard
 ZhouQianhao

 --
 Best Regard
 ZhouQianhao



avoid creating small objects

2015-08-14 Thread
Hi,
All I want to do is that,
1. read from some source
2. do some calculation to get some byte array
3. write the byte array to hdfs
In hadoop, I can share an ImmutableByteWritable, and do some
System.arrayCopy, it will prevent the application from creating a lot of
small objects which will improve the gc latency.
*However I was wondering if there is any solution like above in spark
that can avoid creating small objects*


Re: avoid creating small objects

2015-08-14 Thread
I am thinking that creating a shared object outside the closure, use this
object to hold the byte array.
will this work?

周千昊 qhz...@apache.org于2015年8月14日周五 下午4:02写道:

 Hi,
 All I want to do is that,
 1. read from some source
 2. do some calculation to get some byte array
 3. write the byte array to hdfs
 In hadoop, I can share an ImmutableByteWritable, and do some
 System.arrayCopy, it will prevent the application from creating a lot of
 small objects which will improve the gc latency.
 *However I was wondering if there is any solution like above in spark
 that can avoid creating small objects*



please help with ClassNotFoundException

2015-08-13 Thread
Hi,
I am using spark 1.4 when an issue occurs to me.
I am trying to use the aggregate function:
JavaRddString rdd = some rdd;
HashMapLong, TypeA zeroValue = new HashMap();
// add initial key-value pair for zeroValue
rdd.aggregate(zeroValue,
   new Function2HashMapLong, TypeA,
String,
HashMapLong, TypeA(){//implementation},
   new Function2HashMapLong, TypeA,
String,
HashMapLong, TypeA(){//implementation})

here is the stack trace when i run the application:

Caused by: java.lang.ClassNotFoundException: TypeA
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at java.util.HashMap.readObject(HashMap.java:1180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
at
org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
at
org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
 *however I have checked that TypeA is in the jar file which is in the
classpath*
*And when I use an empty HashMap as the zeroValue, the exception has
gone*
*Does anyone meet the same problem, or can anyone help me with it?*


Re: please help with ClassNotFoundException

2015-08-13 Thread
Hi sea
Is it the same issue as https://issues.apache.org/jira/browse/SPARK-8368

Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道:

 Are you using 1.4.0?  If yes, use 1.4.1


 -- 原始邮件 --
 *发件人:* 周千昊;qhz...@apache.org;
 *发送时间:* 2015年8月13日(星期四) 晚上6:04
 *收件人:* devdev@spark.apache.org;
 *主题:* please help with ClassNotFoundException

 Hi,
 I am using spark 1.4 when an issue occurs to me.
 I am trying to use the aggregate function:
 JavaRddString rdd = some rdd;
 HashMapLong, TypeA zeroValue = new HashMap();
 // add initial key-value pair for zeroValue
 rdd.aggregate(zeroValue,
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation},
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation})

 here is the stack trace when i run the application:

 Caused by: java.lang.ClassNotFoundException: TypeA
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at java.util.HashMap.readObject(HashMap.java:1180)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
 at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
 at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
 at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
 at
 org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
 at
 org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
  *however I have checked that TypeA is in the jar file which is in
 the classpath*
 *And when I use an empty HashMap as the zeroValue, the exception has
 gone*
 *Does anyone meet the same problem, or can anyone help me with it?*

-- 
Best Regard
ZhouQianhao


Re: please help with ClassNotFoundException

2015-08-13 Thread
Hi Sea
 I have updated spark to 1.4.1, however the problem still exists, any
idea?

Sea 261810...@qq.com于2015年8月14日周五 上午12:36写道:

 Yes, I guess so. I see this bug before.


 -- 原始邮件 --
 *发件人:* 周千昊;z.qian...@gmail.com;
 *发送时间:* 2015年8月13日(星期四) 晚上9:30
 *收件人:* Sea261810...@qq.com; dev@spark.apache.org
 dev@spark.apache.org;
 *主题:* Re: please help with ClassNotFoundException

 Hi sea
 Is it the same issue as
 https://issues.apache.org/jira/browse/SPARK-8368

 Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道:

 Are you using 1.4.0?  If yes, use 1.4.1


 -- 原始邮件 --
 *发件人:* 周千昊;qhz...@apache.org;
 *发送时间:* 2015年8月13日(星期四) 晚上6:04
 *收件人:* devdev@spark.apache.org;
 *主题:* please help with ClassNotFoundException

 Hi,
 I am using spark 1.4 when an issue occurs to me.
 I am trying to use the aggregate function:
 JavaRddString rdd = some rdd;
 HashMapLong, TypeA zeroValue = new HashMap();
 // add initial key-value pair for zeroValue
 rdd.aggregate(zeroValue,
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation},
new Function2HashMapLong, TypeA,
 String,
 HashMapLong, TypeA(){//implementation})

 here is the stack trace when i run the application:

 Caused by: java.lang.ClassNotFoundException: TypeA
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at java.util.HashMap.readObject(HashMap.java:1180)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
 at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
 at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
 at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
 at
 org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
 at
 org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
  *however I have checked that TypeA is in the jar file which is in
 the classpath*
 *And when I use an empty HashMap as the zeroValue, the exception has
 gone*
 *Does anyone meet the same problem, or can anyone help me with it?*

 --
 Best Regard
 ZhouQianhao

-- 
Best Regard
ZhouQianhao