Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow
I have replace default java serialization with Kyro. It indeed reduce the shuffle size and the performance has been improved, however the shuffle speed remains unchanged. I am quite newbie to Spark, does anyone have idea about towards which direction I should go to find the root cause? 周千昊 <qhz...@apache.org>于2015年10月23日周五 下午5:50写道: > We have not tried that yet, however both implementations on MR and spark > are tested on the same amount of partition and same cluster > > 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道: > >> Hi, >> >> Not an expert on this kind of implementation. But referring to the >> performance result, >> >> if the mapside partitions fittable according to the different datasets? >> Have you tried to >> >> increase the count of partitions? >> >> >> >> >> >> 250635...@qq.com >> >> From: Li Yang >> Date: 2015-10-23 16:17 >> To: dev >> CC: Reynold Xin; dev@spark.apache.org >> Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is >> very slow >> Any advise on how to tune the repartitionAndSortWithinPartitions stage? >> Any particular metrics or parameter to look into? Basically Spark and MR >> shuffles the same amount of data, cause we kinda copied MR implementation >> into Spark. >> >> Let us know if more info is needed. >> >> On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote: >> >> > +kylin dev list >> > >> > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道: >> > >> > > Hi, Reynold >> > > Using glom() is because it is easy to adapt to calculation logic >> > > already implemented in MR. And o be clear, we are still in POC. >> > > Since the results shows there is almost no difference between >> this >> > > glom stage and the MR mapper, using glom here might not be the issue. >> > > I was trying to monitor the network traffic when repartition >> > > happens, and it showed that the traffic peek is about 200 - 300MB/s >> while >> > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got >> > any >> > > idea about it? >> > > >> > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道: >> > > >> > >> Why do you do a glom? It seems unnecessarily expensive to materialize >> > >> each partition in memory. >> > >> >> > >> >> > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote: >> > >> >> > >>> Hi, spark community >> > >>> I have an application which I try to migrate from MR to Spark. >> > >>> It will do some calculations from Hive and output to hfile >> which >> > >>> will be bulk load to HBase Table, details as follow: >> > >>> >> > >>> Rdd input = getSourceInputFromHive() >> > >>> Rdd<Tuple2<byte[], byte[]>> mapSideResult = >> > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR >> mapper >> > >>> */) >> > >>> // PS: the result in each partition has already been sorted >> > >>> according to the lexicographical order during the calculation >> > >>> mapSideResult.repartitionAndSortWithPartitions(/*partition with >> > >>> byte[][] which is HTable split key, equivalent to MR shuffle >> > */).map(/*transform >> > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, >> > KeyValue>/*equivalent >> > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to >> > >>> hfile*/) >> > >>> >> > >>> This all works fine on a small dataset, and spark outruns MR >> by >> > >>> about 10%. However when I apply it on a dataset of 150 million >> > records, MR >> > >>> is about 100% faster than spark.(*MR 25min spark 50min*) >> > >>>After exploring into the application UI, it shows that in the >> > >>> repartitionAndSortWithinPartitions stage is very slow, and in the >> > shuffle >> > >>> phase a 6GB size shuffle cost about 18min which is quite >> unreasonable >> > >>>*Can anyone help with this issue and give me some advice on >> > >>> this? **It’s not iterative processing, however I believe Spark >> could be >> > >>> the same fast at minimal.* >> > >>> >> > >>> Here are the cluster info: >> > >>> vm: 8 nodes * (128G mem + 64 core) >> > >>> hadoop cluster: hdp 2.2.6 >> > >>> spark running mode: yarn-client >> > >>> spark version: 1.5.1 >> > >>> >> > >>> >> > >> >> > >> > -- Best Regard ZhouQianhao
Re: Re: repartitionAndSortWithinPartitions task shuffle phase is very slow
We have not tried that yet, however both implementations on MR and spark are tested on the same amount of partition and same cluster 250635...@qq.com <250635...@qq.com>于2015年10月23日周五 下午5:21写道: > Hi, > > Not an expert on this kind of implementation. But referring to the > performance result, > > if the mapside partitions fittable according to the different datasets? > Have you tried to > > increase the count of partitions? > > > > > > 250635...@qq.com > > From: Li Yang > Date: 2015-10-23 16:17 > To: dev > CC: Reynold Xin; dev@spark.apache.org > Subject: Re: repartitionAndSortWithinPartitions task shuffle phase is very > slow > Any advise on how to tune the repartitionAndSortWithinPartitions stage? > Any particular metrics or parameter to look into? Basically Spark and MR > shuffles the same amount of data, cause we kinda copied MR implementation > into Spark. > > Let us know if more info is needed. > > On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote: > > > +kylin dev list > > > > 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道: > > > > > Hi, Reynold > > > Using glom() is because it is easy to adapt to calculation logic > > > already implemented in MR. And o be clear, we are still in POC. > > > Since the results shows there is almost no difference between > this > > > glom stage and the MR mapper, using glom here might not be the issue. > > > I was trying to monitor the network traffic when repartition > > > happens, and it showed that the traffic peek is about 200 - 300MB/s > while > > > it stayed at speed of about 3-4MB/s for a long time. Have you guys got > > any > > > idea about it? > > > > > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道: > > > > > >> Why do you do a glom? It seems unnecessarily expensive to materialize > > >> each partition in memory. > > >> > > >> > > >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote: > > >> > > >>> Hi, spark community > > >>> I have an application which I try to migrate from MR to Spark. > > >>> It will do some calculations from Hive and output to hfile > which > > >>> will be bulk load to HBase Table, details as follow: > > >>> > > >>> Rdd input = getSourceInputFromHive() > > >>> Rdd<Tuple2<byte[], byte[]>> mapSideResult = > > >>> input.glom().mapPartitions(/*some calculation, equivalent to MR > mapper > > >>> */) > > >>> // PS: the result in each partition has already been sorted > > >>> according to the lexicographical order during the calculation > > >>> mapSideResult.repartitionAndSortWithPartitions(/*partition with > > >>> byte[][] which is HTable split key, equivalent to MR shuffle > > */).map(/*transform > > >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, > > KeyValue>/*equivalent > > >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to > > >>> hfile*/) > > >>> > > >>> This all works fine on a small dataset, and spark outruns MR by > > >>> about 10%. However when I apply it on a dataset of 150 million > > records, MR > > >>> is about 100% faster than spark.(*MR 25min spark 50min*) > > >>>After exploring into the application UI, it shows that in the > > >>> repartitionAndSortWithinPartitions stage is very slow, and in the > > shuffle > > >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable > > >>>*Can anyone help with this issue and give me some advice on > > >>> this? **It’s not iterative processing, however I believe Spark could > be > > >>> the same fast at minimal.* > > >>> > > >>> Here are the cluster info: > > >>> vm: 8 nodes * (128G mem + 64 core) > > >>> hadoop cluster: hdp 2.2.6 > > >>> spark running mode: yarn-client > > >>> spark version: 1.5.1 > > >>> > > >>> > > >> > > >
repartitionAndSortWithinPartitions task shuffle phase is very slow
Hi, spark community I have an application which I try to migrate from MR to Spark. It will do some calculations from Hive and output to hfile which will be bulk load to HBase Table, details as follow: Rdd input = getSourceInputFromHive() Rdd> mapSideResult = input.glom().mapPartitions(/*some calculation, equivalent to MR mapper*/) // PS: the result in each partition has already been sorted according to the lexicographical order during the calculation mapSideResult.repartitionAndSortWithPartitions(/*partition with byte[][] which is HTable split key, equivalent to MR shuffle */).map(/*transform Tuple2 to Tuple2 /*equivalent to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to hfile*/) This all works fine on a small dataset, and spark outruns MR by about 10%. However when I apply it on a dataset of 150 million records, MR is about 100% faster than spark.(*MR 25min spark 50min*) After exploring into the application UI, it shows that in the repartitionAndSortWithinPartitions stage is very slow, and in the shuffle phase a 6GB size shuffle cost about 18min which is quite unreasonable *Can anyone help with this issue and give me some advice on this? **It’s not iterative processing, however I believe Spark could be the same fast at minimal.* Here are the cluster info: vm: 8 nodes * (128G mem + 64 core) hadoop cluster: hdp 2.2.6 spark running mode: yarn-client spark version: 1.5.1
Re: repartitionAndSortWithinPartitions task shuffle phase is very slow
+kylin dev list 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道: > Hi, Reynold > Using glom() is because it is easy to adapt to calculation logic > already implemented in MR. And o be clear, we are still in POC. > Since the results shows there is almost no difference between this > glom stage and the MR mapper, using glom here might not be the issue. > I was trying to monitor the network traffic when repartition > happens, and it showed that the traffic peek is about 200 - 300MB/s while > it stayed at speed of about 3-4MB/s for a long time. Have you guys got any > idea about it? > > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道: > >> Why do you do a glom? It seems unnecessarily expensive to materialize >> each partition in memory. >> >> >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote: >> >>> Hi, spark community >>> I have an application which I try to migrate from MR to Spark. >>> It will do some calculations from Hive and output to hfile which >>> will be bulk load to HBase Table, details as follow: >>> >>> Rdd input = getSourceInputFromHive() >>> Rdd<Tuple2<byte[], byte[]>> mapSideResult = >>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper >>> */) >>> // PS: the result in each partition has already been sorted >>> according to the lexicographical order during the calculation >>> mapSideResult.repartitionAndSortWithPartitions(/*partition with >>> byte[][] which is HTable split key, equivalent to MR shuffle >>> */).map(/*transform >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, >>> KeyValue>/*equivalent >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to >>> hfile*/) >>> >>> This all works fine on a small dataset, and spark outruns MR by >>> about 10%. However when I apply it on a dataset of 150 million records, MR >>> is about 100% faster than spark.(*MR 25min spark 50min*) >>>After exploring into the application UI, it shows that in the >>> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable >>>*Can anyone help with this issue and give me some advice on >>> this? **It’s not iterative processing, however I believe Spark could be >>> the same fast at minimal.* >>> >>> Here are the cluster info: >>> vm: 8 nodes * (128G mem + 64 core) >>> hadoop cluster: hdp 2.2.6 >>> spark running mode: yarn-client >>> spark version: 1.5.1 >>> >>> >>
Re: repartitionAndSortWithinPartitions task shuffle phase is very slow
Hi, Reynold Using glom() is because it is easy to adapt to calculation logic already implemented in MR. And o be clear, we are still in POC. Since the results shows there is almost no difference between this glom stage and the MR mapper, using glom here might not be the issue. I was trying to monitor the network traffic when repartition happens, and it showed that the traffic peek is about 200 - 300MB/s while it stayed at speed of about 3-4MB/s for a long time. Have you guys got any idea about it? Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道: > Why do you do a glom? It seems unnecessarily expensive to materialize each > partition in memory. > > > On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote: > >> Hi, spark community >> I have an application which I try to migrate from MR to Spark. >> It will do some calculations from Hive and output to hfile which >> will be bulk load to HBase Table, details as follow: >> >> Rdd input = getSourceInputFromHive() >> Rdd<Tuple2<byte[], byte[]>> mapSideResult = >> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper*/) >> // PS: the result in each partition has already been sorted >> according to the lexicographical order during the calculation >> mapSideResult.repartitionAndSortWithPartitions(/*partition with >> byte[][] which is HTable split key, equivalent to MR shuffle >> */).map(/*transform >> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, >> KeyValue>/*equivalent >> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to >> hfile*/) >> >> This all works fine on a small dataset, and spark outruns MR by >> about 10%. However when I apply it on a dataset of 150 million records, MR >> is about 100% faster than spark.(*MR 25min spark 50min*) >>After exploring into the application UI, it shows that in the >> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle >> phase a 6GB size shuffle cost about 18min which is quite unreasonable >>*Can anyone help with this issue and give me some advice on >> this? **It’s not iterative processing, however I believe Spark could be >> the same fast at minimal.* >> >> Here are the cluster info: >> vm: 8 nodes * (128G mem + 64 core) >> hadoop cluster: hdp 2.2.6 >> spark running mode: yarn-client >> spark version: 1.5.1 >> >> >
Re: please help with ClassNotFoundException
Hi, Sea Problem solved, it turn out to be that I have updated spark cluster to 1.4.1, however the client has not been updated. Thank you so much. Sea 261810...@qq.com于2015年8月14日周五 下午1:01写道: I have no idea... We use scala. You upgrade to 1.4 so quickly..., are you using spark in production? Spark 1.3 is better than spark1.4. -- 原始邮件 -- *发件人:* 周千昊;z.qian...@gmail.com; *发送时间:* 2015年8月14日(星期五) 中午11:14 *收件人:* Sea261810...@qq.com; dev@spark.apache.org dev@spark.apache.org; *主题:* Re: please help with ClassNotFoundException Hi Sea I have updated spark to 1.4.1, however the problem still exists, any idea? Sea 261810...@qq.com于2015年8月14日周五 上午12:36写道: Yes, I guess so. I see this bug before. -- 原始邮件 -- *发件人:* 周千昊;z.qian...@gmail.com; *发送时间:* 2015年8月13日(星期四) 晚上9:30 *收件人:* Sea261810...@qq.com; dev@spark.apache.org dev@spark.apache.org; *主题:* Re: please help with ClassNotFoundException Hi sea Is it the same issue as https://issues.apache.org/jira/browse/SPARK-8368 Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道: Are you using 1.4.0? If yes, use 1.4.1 -- 原始邮件 -- *发件人:* 周千昊;qhz...@apache.org; *发送时间:* 2015年8月13日(星期四) 晚上6:04 *收件人:* devdev@spark.apache.org; *主题:* please help with ClassNotFoundException Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone* *Does anyone meet the same problem, or can anyone help me with it?* -- Best Regard ZhouQianhao -- Best Regard ZhouQianhao
avoid creating small objects
Hi, All I want to do is that, 1. read from some source 2. do some calculation to get some byte array 3. write the byte array to hdfs In hadoop, I can share an ImmutableByteWritable, and do some System.arrayCopy, it will prevent the application from creating a lot of small objects which will improve the gc latency. *However I was wondering if there is any solution like above in spark that can avoid creating small objects*
Re: avoid creating small objects
I am thinking that creating a shared object outside the closure, use this object to hold the byte array. will this work? 周千昊 qhz...@apache.org于2015年8月14日周五 下午4:02写道: Hi, All I want to do is that, 1. read from some source 2. do some calculation to get some byte array 3. write the byte array to hdfs In hadoop, I can share an ImmutableByteWritable, and do some System.arrayCopy, it will prevent the application from creating a lot of small objects which will improve the gc latency. *However I was wondering if there is any solution like above in spark that can avoid creating small objects*
please help with ClassNotFoundException
Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone* *Does anyone meet the same problem, or can anyone help me with it?*
Re: please help with ClassNotFoundException
Hi sea Is it the same issue as https://issues.apache.org/jira/browse/SPARK-8368 Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道: Are you using 1.4.0? If yes, use 1.4.1 -- 原始邮件 -- *发件人:* 周千昊;qhz...@apache.org; *发送时间:* 2015年8月13日(星期四) 晚上6:04 *收件人:* devdev@spark.apache.org; *主题:* please help with ClassNotFoundException Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone* *Does anyone meet the same problem, or can anyone help me with it?* -- Best Regard ZhouQianhao
Re: please help with ClassNotFoundException
Hi Sea I have updated spark to 1.4.1, however the problem still exists, any idea? Sea 261810...@qq.com于2015年8月14日周五 上午12:36写道: Yes, I guess so. I see this bug before. -- 原始邮件 -- *发件人:* 周千昊;z.qian...@gmail.com; *发送时间:* 2015年8月13日(星期四) 晚上9:30 *收件人:* Sea261810...@qq.com; dev@spark.apache.org dev@spark.apache.org; *主题:* Re: please help with ClassNotFoundException Hi sea Is it the same issue as https://issues.apache.org/jira/browse/SPARK-8368 Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道: Are you using 1.4.0? If yes, use 1.4.1 -- 原始邮件 -- *发件人:* 周千昊;qhz...@apache.org; *发送时间:* 2015年8月13日(星期四) 晚上6:04 *收件人:* devdev@spark.apache.org; *主题:* please help with ClassNotFoundException Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone* *Does anyone meet the same problem, or can anyone help me with it?* -- Best Regard ZhouQianhao -- Best Regard ZhouQianhao