Re: Equally split a RDD partition into two partition at the same node
Hi Pradeep, That is a good idea. My customized RDDs are similar to the NewHadoopRDD. If we have billions of InputSplit, will it be bottlenecked for the performance? That is, will too many data need to be transferred from master node to computing nodes by networking? Thanks, Fei On Mon, Jan 16, 2017 at 2:07 PM, Pradeep Gollakota <pradeep...@gmail.com> wrote: > Usually this kind of thing can be done at a lower level in the InputFormat > usually by specifying the max split size. Have you looked into that > possibility with your InputFormat? > > On Sun, Jan 15, 2017 at 9:42 PM, Fei Hu <hufe...@gmail.com> wrote: > >> Hi Jasbir, >> >> Yes, you are right. Do you have any idea about my question? >> >> Thanks, >> Fei >> >> On Mon, Jan 16, 2017 at 12:37 AM, <jasbir.s...@accenture.com> wrote: >> >>> Hi, >>> >>> >>> >>> Coalesce is used to decrease the number of partitions. If you give the >>> value of numPartitions greater than the current partition, I don’t think >>> RDD number of partitions will be increased. >>> >>> >>> >>> Thanks, >>> >>> Jasbir >>> >>> >>> >>> *From:* Fei Hu [mailto:hufe...@gmail.com] >>> *Sent:* Sunday, January 15, 2017 10:10 PM >>> *To:* zouz...@cs.toronto.edu >>> *Cc:* user @spark <u...@spark.apache.org>; dev@spark.apache.org >>> *Subject:* Re: Equally split a RDD partition into two partition at the >>> same node >>> >>> >>> >>> Hi Anastasios, >>> >>> >>> >>> Thanks for your reply. If I just increase the numPartitions to be twice >>> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) >>> keeps the data locality? Do I need to define my own Partitioner? >>> >>> >>> >>> Thanks, >>> >>> Fei >>> >>> >>> >>> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com> >>> wrote: >>> >>> Hi Fei, >>> >>> >>> >>> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? >>> >>> >>> >>> https://github.com/apache/spark/blob/branch-1.6/core/src/mai >>> n/scala/org/apache/spark/rdd/RDD.scala#L395 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_branch-2D1.6_core_src_main_scala_org_apache_spark_rdd_RDD.scala-23L395=DgMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=7scIIjM0jY9x3fjvY6a_yERLxMA2NwA8l0DnuyrL6yA=bFMBTBwSwMOFRd7Or6fF0sQOH87UIhmuUqEO9UkxPIY=qNa3MyvKhIDlXHtxm3s0DZJRZaSWIHpaNhcS86GEQow=> >>> >>> >>> >>> coalesce is mostly used for reducing the number of partitions before >>> writing to HDFS, but it might still be a narrow dependency (satisfying your >>> requirements) if you increase the # of partitions. >>> >>> >>> >>> Best, >>> >>> Anastasios >>> >>> >>> >>> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote: >>> >>> Dear all, >>> >>> >>> >>> I want to equally divide a RDD partition into two partitions. That >>> means, the first half of elements in the partition will create a new >>> partition, and the second half of elements in the partition will generate >>> another new partition. But the two new partitions are required to be at the >>> same node with their parent partition, which can help get high data >>> locality. >>> >>> >>> >>> Is there anyone who knows how to implement it or any hints for it? >>> >>> >>> >>> Thanks in advance, >>> >>> Fei >>> >>> >>> >>> >>> >>> >>> >>> -- >>> >>> -- Anastasios Zouzias >>> >>> >>> >>> -- >>> >>> This message is for the designated recipient only and may contain >>> privileged, proprietary, or otherwise confidential information. If you have >>> received it in error, please notify the sender immediately and delete the >>> original. Any other use of the e-mail by you is prohibited. Where allowed >>> by local law, electronic communications with Accenture and its affiliates, >>> including e-mail and instant messaging (including content), may be scanned >>> by our systems for the purposes of information security and assessment of >>> internal compliance with Accenture policy. >>> >>> __ >>> >>> www.accenture.com >>> >> >> >
Re: Equally split a RDD partition into two partition at the same node
Hi Liang-Chi, Yes, the logic split is needed in compute(). The preferred locations can be derived from the customized Partition class. Thanks for your help! Cheers, Fei On Mon, Jan 16, 2017 at 3:00 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote: > > Hi Fei, > > I think it should work. But you may need to add few logic in compute() to > decide which half of the parent partition is needed to output. And you need > to get the correct preferred locations for the partitions sharing the same > parent partition. > > > Fei Hu wrote > > Hi Liang-Chi, > > > > Yes, you are right. I implement the following solution for this problem, > > and it works. But I am not sure if it is efficient: > > > > I double the partitions of the parent RDD, and then use the new > partitions > > and parent RDD to construct the target RDD. In the compute() function of > > the target RDD, I use the input partition to get the corresponding parent > > partition, and get the half elements in the parent partitions as the > > output > > of the computing function. > > > > Thanks, > > Fei > > > > On Sun, Jan 15, 2017 at 11:01 PM, Liang-Chi Hsieh > > > viirya@ > > > wrote: > > > >> > >> Hi, > >> > >> When calling `coalesce` with `shuffle = false`, it is going to produce > at > >> most min(numPartitions, previous RDD's number of partitions). So I think > >> it > >> can't be used to double the number of partitions. > >> > >> > >> Anastasios Zouzias wrote > >> > Hi Fei, > >> > > >> > How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? > >> > > >> > https://github.com/apache/spark/blob/branch-1.6/core/ > >> src/main/scala/org/apache/spark/rdd/RDD.scala#L395 > >> > > >> > coalesce is mostly used for reducing the number of partitions before > >> > writing to HDFS, but it might still be a narrow dependency (satisfying > >> > your > >> > requirements) if you increase the # of partitions. > >> > > >> > Best, > >> > Anastasios > >> > > >> > On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu > >> > >> > hufei68@ > >> > >> > wrote: > >> > > >> >> Dear all, > >> >> > >> >> I want to equally divide a RDD partition into two partitions. That > >> means, > >> >> the first half of elements in the partition will create a new > >> partition, > >> >> and the second half of elements in the partition will generate > another > >> >> new > >> >> partition. But the two new partitions are required to be at the same > >> node > >> >> with their parent partition, which can help get high data locality. > >> >> > >> >> Is there anyone who knows how to implement it or any hints for it? > >> >> > >> >> Thanks in advance, > >> >> Fei > >> >> > >> >> > >> > > >> > > >> > -- > >> > -- Anastasios Zouzias > >> > > >> > >> > azo@.ibm > >> > >> > > >> > >> > >> > >> > >> > >> - > >> Liang-Chi Hsieh | @viirya > >> Spark Technology Center > >> http://www.spark.tc/ > >> -- > >> View this message in context: http://apache-spark- > >> developers-list.1001551.n3.nabble.com/Equally-split-a- > >> RDD-partition-into-two-partition-at-the-same-node-tp20597p20608.html > >> Sent from the Apache Spark Developers List mailing list archive at > >> Nabble.com. > >> > >> - > >> To unsubscribe e-mail: > > > dev-unsubscribe@.apache > > >> > >> > > > > > > - > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Equally-split-a- > RDD-partition-into-two-partition-at-the-same-node-tp20597p20613.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: Equally split a RDD partition into two partition at the same node
Hi Jasbir, Yes, you are right. Do you have any idea about my question? Thanks, Fei On Mon, Jan 16, 2017 at 12:37 AM, <jasbir.s...@accenture.com> wrote: > Hi, > > > > Coalesce is used to decrease the number of partitions. If you give the > value of numPartitions greater than the current partition, I don’t think > RDD number of partitions will be increased. > > > > Thanks, > > Jasbir > > > > *From:* Fei Hu [mailto:hufe...@gmail.com] > *Sent:* Sunday, January 15, 2017 10:10 PM > *To:* zouz...@cs.toronto.edu > *Cc:* user @spark <u...@spark.apache.org>; dev@spark.apache.org > *Subject:* Re: Equally split a RDD partition into two partition at the > same node > > > > Hi Anastasios, > > > > Thanks for your reply. If I just increase the numPartitions to be twice > larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps > the data locality? Do I need to define my own Partitioner? > > > > Thanks, > > Fei > > > > On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com> > wrote: > > Hi Fei, > > > > How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? > > > > https://github.com/apache/spark/blob/branch-1.6/core/ > src/main/scala/org/apache/spark/rdd/RDD.scala#L395 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_branch-2D1.6_core_src_main_scala_org_apache_spark_rdd_RDD.scala-23L395=DgMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=7scIIjM0jY9x3fjvY6a_yERLxMA2NwA8l0DnuyrL6yA=bFMBTBwSwMOFRd7Or6fF0sQOH87UIhmuUqEO9UkxPIY=qNa3MyvKhIDlXHtxm3s0DZJRZaSWIHpaNhcS86GEQow=> > > > > coalesce is mostly used for reducing the number of partitions before > writing to HDFS, but it might still be a narrow dependency (satisfying your > requirements) if you increase the # of partitions. > > > > Best, > > Anastasios > > > > On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote: > > Dear all, > > > > I want to equally divide a RDD partition into two partitions. That means, > the first half of elements in the partition will create a new partition, > and the second half of elements in the partition will generate another new > partition. But the two new partitions are required to be at the same node > with their parent partition, which can help get high data locality. > > > > Is there anyone who knows how to implement it or any hints for it? > > > > Thanks in advance, > > Fei > > > > > > > > -- > > -- Anastasios Zouzias > > > > -- > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Where allowed > by local law, electronic communications with Accenture and its affiliates, > including e-mail and instant messaging (including content), may be scanned > by our systems for the purposes of information security and assessment of > internal compliance with Accenture policy. > > __ > > www.accenture.com >
Re: Equally split a RDD partition into two partition at the same node
Hi Liang-Chi, Yes, you are right. I implement the following solution for this problem, and it works. But I am not sure if it is efficient: I double the partitions of the parent RDD, and then use the new partitions and parent RDD to construct the target RDD. In the compute() function of the target RDD, I use the input partition to get the corresponding parent partition, and get the half elements in the parent partitions as the output of the computing function. Thanks, Fei On Sun, Jan 15, 2017 at 11:01 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote: > > Hi, > > When calling `coalesce` with `shuffle = false`, it is going to produce at > most min(numPartitions, previous RDD's number of partitions). So I think it > can't be used to double the number of partitions. > > > Anastasios Zouzias wrote > > Hi Fei, > > > > How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? > > > > https://github.com/apache/spark/blob/branch-1.6/core/ > src/main/scala/org/apache/spark/rdd/RDD.scala#L395 > > > > coalesce is mostly used for reducing the number of partitions before > > writing to HDFS, but it might still be a narrow dependency (satisfying > > your > > requirements) if you increase the # of partitions. > > > > Best, > > Anastasios > > > > On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu > > > hufei68@ > > > wrote: > > > >> Dear all, > >> > >> I want to equally divide a RDD partition into two partitions. That > means, > >> the first half of elements in the partition will create a new partition, > >> and the second half of elements in the partition will generate another > >> new > >> partition. But the two new partitions are required to be at the same > node > >> with their parent partition, which can help get high data locality. > >> > >> Is there anyone who knows how to implement it or any hints for it? > >> > >> Thanks in advance, > >> Fei > >> > >> > > > > > > -- > > -- Anastasios Zouzias > > > > > azo@.ibm > > > > > > > > > - > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Equally-split-a- > RDD-partition-into-two-partition-at-the-same-node-tp20597p20608.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: Equally split a RDD partition into two partition at the same node
Hi Anastasios, Thanks for your information. I will look into the CoalescedRDD code. Thanks, Fei On Sun, Jan 15, 2017 at 12:21 PM, Anastasios Zouzias <zouz...@gmail.com> wrote: > Hi Fei, > > I looked at the code of CoalescedRDD and probably what I suggested will > not work. > > Speaking of which, CoalescedRDD is private[spark]. If this was not the > case, you could set balanceSlack to 1, and get what you requested, see > > https://github.com/apache/spark/blob/branch-1.6/core/ > src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L75 > > Maybe you could try to use the CoalescedRDD code to implement your > requirement. > > Good luck! > Cheers, > Anastasios > > > On Sun, Jan 15, 2017 at 5:39 PM, Fei Hu <hufe...@gmail.com> wrote: > >> Hi Anastasios, >> >> Thanks for your reply. If I just increase the numPartitions to be twice >> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps >> the data locality? Do I need to define my own Partitioner? >> >> Thanks, >> Fei >> >> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com> >> wrote: >> >>> Hi Fei, >>> >>> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? >>> >>> https://github.com/apache/spark/blob/branch-1.6/core/src/mai >>> n/scala/org/apache/spark/rdd/RDD.scala#L395 >>> >>> coalesce is mostly used for reducing the number of partitions before >>> writing to HDFS, but it might still be a narrow dependency (satisfying your >>> requirements) if you increase the # of partitions. >>> >>> Best, >>> Anastasios >>> >>> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote: >>> >>>> Dear all, >>>> >>>> I want to equally divide a RDD partition into two partitions. That >>>> means, the first half of elements in the partition will create a new >>>> partition, and the second half of elements in the partition will generate >>>> another new partition. But the two new partitions are required to be at the >>>> same node with their parent partition, which can help get high data >>>> locality. >>>> >>>> Is there anyone who knows how to implement it or any hints for it? >>>> >>>> Thanks in advance, >>>> Fei >>>> >>>> >>> >>> >>> -- >>> -- Anastasios Zouzias >>> <a...@zurich.ibm.com> >>> >> >> > > > -- > -- Anastasios Zouzias > <a...@zurich.ibm.com> >
Re: Equally split a RDD partition into two partition at the same node
Hi Anastasios, Thanks for your reply. If I just increase the numPartitions to be twice larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps the data locality? Do I need to define my own Partitioner? Thanks, Fei On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com> wrote: > Hi Fei, > > How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ? > > https://github.com/apache/spark/blob/branch-1.6/core/ > src/main/scala/org/apache/spark/rdd/RDD.scala#L395 > > coalesce is mostly used for reducing the number of partitions before > writing to HDFS, but it might still be a narrow dependency (satisfying your > requirements) if you increase the # of partitions. > > Best, > Anastasios > > On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote: > >> Dear all, >> >> I want to equally divide a RDD partition into two partitions. That means, >> the first half of elements in the partition will create a new partition, >> and the second half of elements in the partition will generate another new >> partition. But the two new partitions are required to be at the same node >> with their parent partition, which can help get high data locality. >> >> Is there anyone who knows how to implement it or any hints for it? >> >> Thanks in advance, >> Fei >> >> > > > -- > -- Anastasios Zouzias > <a...@zurich.ibm.com> >
Re: Equally split a RDD partition into two partition at the same node
Hi Rishi, Thanks for your reply! The RDD has 24 partitions, and the cluster has a master node + 24 computing nodes (12 cores per node). Each node will have a partition, and I want to split each partition to two sub-partitions on the same node to improve the parallelism and achieve high data locality. Thanks, Fei On Sun, Jan 15, 2017 at 2:33 AM, Rishi Yadav <ri...@infoobjects.com> wrote: > Can you provide some more details: > 1. How many partitions does RDD have > 2. How big is the cluster > On Sat, Jan 14, 2017 at 3:59 PM Fei Hu <hufe...@gmail.com> wrote: > >> Dear all, >> >> I want to equally divide a RDD partition into two partitions. That means, >> the first half of elements in the partition will create a new partition, >> and the second half of elements in the partition will generate another new >> partition. But the two new partitions are required to be at the same node >> with their parent partition, which can help get high data locality. >> >> Is there anyone who knows how to implement it or any hints for it? >> >> Thanks in advance, >> Fei >> >>
Equally split a RDD partition into two partition at the same node
Dear all, I want to equally divide a RDD partition into two partitions. That means, the first half of elements in the partition will create a new partition, and the second half of elements in the partition will generate another new partition. But the two new partitions are required to be at the same node with their parent partition, which can help get high data locality. Is there anyone who knows how to implement it or any hints for it? Thanks in advance, Fei
Re: RDD Location
It will be very appreciated if you can give more details about why runJob function could not be called in getPreferredLocations() In the NewHadoopRDD class and HadoopRDD class, they get the location information from the inputSplit. But there may be an issue in NewHadoopRDD, because it generates all of the inputSplits on the master node, which means I can only use a single node to generate and filter the inputSplits even if the number of inputSplits is huge. Will it be a performance bottleneck? Thanks, Fei On Fri, Dec 30, 2016 at 10:41 PM, Sun Rui <sunrise_...@163.com> wrote: > You can’t call runJob inside getPreferredLocations(). > You can take a look at the source code of HadoopRDD to help you implement > getPreferredLocations() > appropriately. > > On Dec 31, 2016, at 09:48, Fei Hu <hufe...@gmail.com> wrote: > > That is a good idea. > > I tried add the following code to get getPreferredLocations() function: > > val results: Array[Array[DataChunkPartition]] = context.runJob( > partitionsRDD, (context: TaskContext, partIter: > Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true) > > But it seems to be suspended when executing this function. But if I move > the code to other places, like the main() function, it runs well. > > What is the reason for it? > > Thanks, > Fei > > On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui <sunrise_...@163.com> wrote: > >> Maybe you can create your own subclass of RDD and override the >> getPreferredLocations() to implement the logic of dynamic changing of the >> locations. >> > On Dec 30, 2016, at 12:06, Fei Hu <hufe...@gmail.com> wrote: >> > >> > Dear all, >> > >> > Is there any way to change the host location for a certain partition of >> RDD? >> > >> > "protected def getPreferredLocations(split: Partition)" can be used to >> initialize the location, but how to change it after the initialization? >> > >> > >> > Thanks, >> > Fei >> > >> > >> >> >> > >
context.runJob() was suspended in getPreferredLocations() function
Dear all, I tried to customize my own RDD. In the getPreferredLocations() function, I used the following code to query anonter RDD, which was used as an input to initialize this customized RDD: * val results: Array[Array[DataChunkPartition]] = context.runJob(partitionsRDD, (context: TaskContext, partIter: Iterator[DataChunkPartition]) => partIter.toArray, partitions, allowLocal = true)* The problem is that when executing the above code, the task seemed to be suspended. I mean the job just stopped at this code, but no errors and no outputs. What is the reason for it? Thanks, Fei
Re: RDD Location
That is a good idea. I tried add the following code to get getPreferredLocations() function: val results: Array[Array[DataChunkPartition]] = context.runJob( partitionsRDD, (context: TaskContext, partIter: Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true) But it seems to be suspended when executing this function. But if I move the code to other places, like the main() function, it runs well. What is the reason for it? Thanks, Fei On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui <sunrise_...@163.com> wrote: > Maybe you can create your own subclass of RDD and override the > getPreferredLocations() to implement the logic of dynamic changing of the > locations. > > On Dec 30, 2016, at 12:06, Fei Hu <hufe...@gmail.com> wrote: > > > > Dear all, > > > > Is there any way to change the host location for a certain partition of > RDD? > > > > "protected def getPreferredLocations(split: Partition)" can be used to > initialize the location, but how to change it after the initialization? > > > > > > Thanks, > > Fei > > > > > > >
RDD Location
Dear all, Is there any way to change the host location for a certain partition of RDD? "protected def getPreferredLocations(split: Partition)" can be used to initialize the location, but how to change it after the initialization? Thanks, Fei
Kryo on Zeppelin
Hi All, I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version 1.5.0). I customized the Spark interpreter to use org.apache.spark. serializer.KryoSerializer as spark.serializer. And in the dependency I added Kyro-3.0.3 as following: com.esotericsoftware:kryo:3.0.3 When I wrote the scala notebook and run the program, I got the following errors. But If I compiled these code as jars, and use spark-submit to run it on the cluster, it worked well without errors. WARN [2016-10-10 23:43:40,801] ({task-result-getter-1} Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9, svr-A3-A-U20): java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream. readObject(KryoSerializer.scala:196) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject( TorrentBroadcast.scala:217) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$ readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock( TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$ lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value( TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue( TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) There were also some errors when I run the Zeppelin Tutorial: Caused by: java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.rdd.ParallelCollectionPartition.readObject( ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject( ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData( ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1351) at java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData( ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance. deserialize(JavaSerializer.scala:98) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:194) ... 3 more Caused by: java.lang.NullPointerException at com.twitter.chill.WrappedArraySerializer.read( WrappedArraySerializer.scala:38) at com.twitter.chill.WrappedArraySerializer.read( WrappedArraySerializer.scala:23) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream. readObject(KryoSerializer.scala:192) at org.apache.spark.rdd.ParallelCollectionPartition$$ anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply( ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$ anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply( ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream( Utils.scala:142) at org.apache.spark.rdd.ParallelCollectionPartition$$ anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) Is there anyone knowing why it happended? Thanks in advance, Fei
[no subject]
Hi All, I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version 1.5.0). I customized the Spark interpreter to use org.apache.spark.serializer.KryoSerializer as spark.serializer. And in the dependency I added Kyro-3.0.3 as following: com.esotericsoftware:kryo:3.0.3 When I wrote the scala notebook and run the program, I got the following errors. But If I compiled these code as jars, and use spark-submit to run it on the cluster, it worked well without errors. WARN [2016-10-10 23:43:40,801] ({task-result-getter-1} Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9, svr-A3-A-U20): java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:196) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) There were also some errors when I run the Zeppelin Tutorial: Caused by: java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) ... 3 more Caused by: java.lang.NullPointerException at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:38) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:142) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) Is there anyone knowing why it happended? Thanks in advance, Fei
Spark application Runtime Measurement
Dear all, I have a question about how to measure the runtime for a Spak application. Here is an example: - On the Spark UI: the total duration time is 2.0 minutes = 120 seconds as following [image: Screen Shot 2016-07-09 at 11.45.44 PM.png] - However, when I check the jobs launched by the application, the time is 13s + 0.8s + 4s = 17.8 seconds, which is much less than 120 seconds. I am not sure which time I should choose to measure the performance of the Spark application. [image: Screen Shot 2016-07-09 at 11.48.26 PM.png] - I also check the event timeline as following. There is a big gap between the second job and the third job. I do not know what happened during that gap. [image: Screen Shot 2016-07-09 at 11.53.29 PM.png] Is there anyone who can help explain which time is the exact time to measure the performance of a Spark application. Thanks in advance, Fei