Re: Equally split a RDD partition into two partition at the same node

2017-01-16 Thread Fei Hu
Hi Pradeep,

That is a good idea. My customized RDDs are similar to the NewHadoopRDD. If
we have billions of InputSplit, will it be bottlenecked for the
performance? That is, will too many data need to be transferred from master
node to computing nodes by networking?

Thanks,
Fei

On Mon, Jan 16, 2017 at 2:07 PM, Pradeep Gollakota <pradeep...@gmail.com>
wrote:

> Usually this kind of thing can be done at a lower level in the InputFormat
> usually by specifying the max split size. Have you looked into that
> possibility with your InputFormat?
>
> On Sun, Jan 15, 2017 at 9:42 PM, Fei Hu <hufe...@gmail.com> wrote:
>
>> Hi Jasbir,
>>
>> Yes, you are right. Do you have any idea about my question?
>>
>> Thanks,
>> Fei
>>
>> On Mon, Jan 16, 2017 at 12:37 AM, <jasbir.s...@accenture.com> wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> Coalesce is used to decrease the number of partitions. If you give the
>>> value of numPartitions greater than the current partition, I don’t think
>>> RDD number of partitions will be increased.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jasbir
>>>
>>>
>>>
>>> *From:* Fei Hu [mailto:hufe...@gmail.com]
>>> *Sent:* Sunday, January 15, 2017 10:10 PM
>>> *To:* zouz...@cs.toronto.edu
>>> *Cc:* user @spark <u...@spark.apache.org>; dev@spark.apache.org
>>> *Subject:* Re: Equally split a RDD partition into two partition at the
>>> same node
>>>
>>>
>>>
>>> Hi Anastasios,
>>>
>>>
>>>
>>> Thanks for your reply. If I just increase the numPartitions to be twice
>>> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false)
>>> keeps the data locality? Do I need to define my own Partitioner?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Fei
>>>
>>>
>>>
>>> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com>
>>> wrote:
>>>
>>> Hi Fei,
>>>
>>>
>>>
>>> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
>>>
>>>
>>>
>>> https://github.com/apache/spark/blob/branch-1.6/core/src/mai
>>> n/scala/org/apache/spark/rdd/RDD.scala#L395
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_branch-2D1.6_core_src_main_scala_org_apache_spark_rdd_RDD.scala-23L395=DgMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=7scIIjM0jY9x3fjvY6a_yERLxMA2NwA8l0DnuyrL6yA=bFMBTBwSwMOFRd7Or6fF0sQOH87UIhmuUqEO9UkxPIY=qNa3MyvKhIDlXHtxm3s0DZJRZaSWIHpaNhcS86GEQow=>
>>>
>>>
>>>
>>> coalesce is mostly used for reducing the number of partitions before
>>> writing to HDFS, but it might still be a narrow dependency (satisfying your
>>> requirements) if you increase the # of partitions.
>>>
>>>
>>>
>>> Best,
>>>
>>> Anastasios
>>>
>>>
>>>
>>> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote:
>>>
>>> Dear all,
>>>
>>>
>>>
>>> I want to equally divide a RDD partition into two partitions. That
>>> means, the first half of elements in the partition will create a new
>>> partition, and the second half of elements in the partition will generate
>>> another new partition. But the two new partitions are required to be at the
>>> same node with their parent partition, which can help get high data
>>> locality.
>>>
>>>
>>>
>>> Is there anyone who knows how to implement it or any hints for it?
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>> Fei
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> -- Anastasios Zouzias
>>>
>>>
>>>
>>> --
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Where allowed
>>> by local law, electronic communications with Accenture and its affiliates,
>>> including e-mail and instant messaging (including content), may be scanned
>>> by our systems for the purposes of information security and assessment of
>>> internal compliance with Accenture policy.
>>> 
>>> __
>>>
>>> www.accenture.com
>>>
>>
>>
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-16 Thread Fei Hu
Hi Liang-Chi,

Yes, the logic split is needed in compute(). The preferred locations can be
derived from the customized Partition class.

Thanks for your help!

Cheers,
Fei


On Mon, Jan 16, 2017 at 3:00 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Hi Fei,
>
> I think it should work. But you may need to add few logic in compute() to
> decide which half of the parent partition is needed to output. And you need
> to get the correct preferred locations for the partitions sharing the same
> parent partition.
>
>
> Fei Hu wrote
> > Hi Liang-Chi,
> >
> > Yes, you are right. I implement the following solution for this problem,
> > and it works. But I am not sure if it is efficient:
> >
> > I double the partitions of the parent RDD, and then use the new
> partitions
> > and parent RDD to construct the target RDD. In the compute() function of
> > the target RDD, I use the input partition to get the corresponding parent
> > partition, and get the half elements in the parent partitions as the
> > output
> > of the computing function.
> >
> > Thanks,
> > Fei
> >
> > On Sun, Jan 15, 2017 at 11:01 PM, Liang-Chi Hsieh 
>
> > viirya@
>
> >  wrote:
> >
> >>
> >> Hi,
> >>
> >> When calling `coalesce` with `shuffle = false`, it is going to produce
> at
> >> most min(numPartitions, previous RDD's number of partitions). So I think
> >> it
> >> can't be used to double the number of partitions.
> >>
> >>
> >> Anastasios Zouzias wrote
> >> > Hi Fei,
> >> >
> >> > How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
> >> >
> >> > https://github.com/apache/spark/blob/branch-1.6/core/
> >> src/main/scala/org/apache/spark/rdd/RDD.scala#L395
> >> >
> >> > coalesce is mostly used for reducing the number of partitions before
> >> > writing to HDFS, but it might still be a narrow dependency (satisfying
> >> > your
> >> > requirements) if you increase the # of partitions.
> >> >
> >> > Best,
> >> > Anastasios
> >> >
> >> > On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu 
> >>
> >> > hufei68@
> >>
> >> >  wrote:
> >> >
> >> >> Dear all,
> >> >>
> >> >> I want to equally divide a RDD partition into two partitions. That
> >> means,
> >> >> the first half of elements in the partition will create a new
> >> partition,
> >> >> and the second half of elements in the partition will generate
> another
> >> >> new
> >> >> partition. But the two new partitions are required to be at the same
> >> node
> >> >> with their parent partition, which can help get high data locality.
> >> >>
> >> >> Is there anyone who knows how to implement it or any hints for it?
> >> >>
> >> >> Thanks in advance,
> >> >> Fei
> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > -- Anastasios Zouzias
> >> > 
> >>
> >> > azo@.ibm
> >>
> >> > 
> >>
> >>
> >>
> >>
> >>
> >> -
> >> Liang-Chi Hsieh | @viirya
> >> Spark Technology Center
> >> http://www.spark.tc/
> >> --
> >> View this message in context: http://apache-spark-
> >> developers-list.1001551.n3.nabble.com/Equally-split-a-
> >> RDD-partition-into-two-partition-at-the-same-node-tp20597p20608.html
> >> Sent from the Apache Spark Developers List mailing list archive at
> >> Nabble.com.
> >>
> >> -
> >> To unsubscribe e-mail:
>
> > dev-unsubscribe@.apache
>
> >>
> >>
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Equally-split-a-
> RDD-partition-into-two-partition-at-the-same-node-tp20597p20613.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Fei Hu
Hi Jasbir,

Yes, you are right. Do you have any idea about my question?

Thanks,
Fei

On Mon, Jan 16, 2017 at 12:37 AM, <jasbir.s...@accenture.com> wrote:

> Hi,
>
>
>
> Coalesce is used to decrease the number of partitions. If you give the
> value of numPartitions greater than the current partition, I don’t think
> RDD number of partitions will be increased.
>
>
>
> Thanks,
>
> Jasbir
>
>
>
> *From:* Fei Hu [mailto:hufe...@gmail.com]
> *Sent:* Sunday, January 15, 2017 10:10 PM
> *To:* zouz...@cs.toronto.edu
> *Cc:* user @spark <u...@spark.apache.org>; dev@spark.apache.org
> *Subject:* Re: Equally split a RDD partition into two partition at the
> same node
>
>
>
> Hi Anastasios,
>
>
>
> Thanks for your reply. If I just increase the numPartitions to be twice
> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps
> the data locality? Do I need to define my own Partitioner?
>
>
>
> Thanks,
>
> Fei
>
>
>
> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com>
> wrote:
>
> Hi Fei,
>
>
>
> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
>
>
>
> https://github.com/apache/spark/blob/branch-1.6/core/
> src/main/scala/org/apache/spark/rdd/RDD.scala#L395
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_branch-2D1.6_core_src_main_scala_org_apache_spark_rdd_RDD.scala-23L395=DgMFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=7scIIjM0jY9x3fjvY6a_yERLxMA2NwA8l0DnuyrL6yA=bFMBTBwSwMOFRd7Or6fF0sQOH87UIhmuUqEO9UkxPIY=qNa3MyvKhIDlXHtxm3s0DZJRZaSWIHpaNhcS86GEQow=>
>
>
>
> coalesce is mostly used for reducing the number of partitions before
> writing to HDFS, but it might still be a narrow dependency (satisfying your
> requirements) if you increase the # of partitions.
>
>
>
> Best,
>
> Anastasios
>
>
>
> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote:
>
> Dear all,
>
>
>
> I want to equally divide a RDD partition into two partitions. That means,
> the first half of elements in the partition will create a new partition,
> and the second half of elements in the partition will generate another new
> partition. But the two new partitions are required to be at the same node
> with their parent partition, which can help get high data locality.
>
>
>
> Is there anyone who knows how to implement it or any hints for it?
>
>
>
> Thanks in advance,
>
> Fei
>
>
>
>
>
>
>
> --
>
> -- Anastasios Zouzias
>
>
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
> 
> __
>
> www.accenture.com
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Fei Hu
Hi Liang-Chi,

Yes, you are right. I implement the following solution for this problem,
and it works. But I am not sure if it is efficient:

I double the partitions of the parent RDD, and then use the new partitions
and parent RDD to construct the target RDD. In the compute() function of
the target RDD, I use the input partition to get the corresponding parent
partition, and get the half elements in the parent partitions as the output
of the computing function.

Thanks,
Fei

On Sun, Jan 15, 2017 at 11:01 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Hi,
>
> When calling `coalesce` with `shuffle = false`, it is going to produce at
> most min(numPartitions, previous RDD's number of partitions). So I think it
> can't be used to double the number of partitions.
>
>
> Anastasios Zouzias wrote
> > Hi Fei,
> >
> > How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
> >
> > https://github.com/apache/spark/blob/branch-1.6/core/
> src/main/scala/org/apache/spark/rdd/RDD.scala#L395
> >
> > coalesce is mostly used for reducing the number of partitions before
> > writing to HDFS, but it might still be a narrow dependency (satisfying
> > your
> > requirements) if you increase the # of partitions.
> >
> > Best,
> > Anastasios
> >
> > On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu 
>
> > hufei68@
>
> >  wrote:
> >
> >> Dear all,
> >>
> >> I want to equally divide a RDD partition into two partitions. That
> means,
> >> the first half of elements in the partition will create a new partition,
> >> and the second half of elements in the partition will generate another
> >> new
> >> partition. But the two new partitions are required to be at the same
> node
> >> with their parent partition, which can help get high data locality.
> >>
> >> Is there anyone who knows how to implement it or any hints for it?
> >>
> >> Thanks in advance,
> >> Fei
> >>
> >>
> >
> >
> > --
> > -- Anastasios Zouzias
> > 
>
> > azo@.ibm
>
> > 
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Equally-split-a-
> RDD-partition-into-two-partition-at-the-same-node-tp20597p20608.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Fei Hu
Hi Anastasios,

Thanks for your information. I will look into the CoalescedRDD code.

Thanks,
Fei

On Sun, Jan 15, 2017 at 12:21 PM, Anastasios Zouzias <zouz...@gmail.com>
wrote:

> Hi Fei,
>
> I looked at the code of CoalescedRDD and probably what I suggested will
> not work.
>
> Speaking of which, CoalescedRDD is private[spark]. If this was not the
> case, you could set balanceSlack to 1, and get what you requested, see
>
> https://github.com/apache/spark/blob/branch-1.6/core/
> src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L75
>
> Maybe you could try to use the CoalescedRDD code to implement your
> requirement.
>
> Good luck!
> Cheers,
> Anastasios
>
>
> On Sun, Jan 15, 2017 at 5:39 PM, Fei Hu <hufe...@gmail.com> wrote:
>
>> Hi Anastasios,
>>
>> Thanks for your reply. If I just increase the numPartitions to be twice
>> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps
>> the data locality? Do I need to define my own Partitioner?
>>
>> Thanks,
>> Fei
>>
>> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com>
>> wrote:
>>
>>> Hi Fei,
>>>
>>> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
>>>
>>> https://github.com/apache/spark/blob/branch-1.6/core/src/mai
>>> n/scala/org/apache/spark/rdd/RDD.scala#L395
>>>
>>> coalesce is mostly used for reducing the number of partitions before
>>> writing to HDFS, but it might still be a narrow dependency (satisfying your
>>> requirements) if you increase the # of partitions.
>>>
>>> Best,
>>> Anastasios
>>>
>>> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I want to equally divide a RDD partition into two partitions. That
>>>> means, the first half of elements in the partition will create a new
>>>> partition, and the second half of elements in the partition will generate
>>>> another new partition. But the two new partitions are required to be at the
>>>> same node with their parent partition, which can help get high data
>>>> locality.
>>>>
>>>> Is there anyone who knows how to implement it or any hints for it?
>>>>
>>>> Thanks in advance,
>>>> Fei
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Anastasios Zouzias
>>> <a...@zurich.ibm.com>
>>>
>>
>>
>
>
> --
> -- Anastasios Zouzias
> <a...@zurich.ibm.com>
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Fei Hu
Hi Anastasios,

Thanks for your reply. If I just increase the numPartitions to be twice
larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps
the data locality? Do I need to define my own Partitioner?

Thanks,
Fei

On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com>
wrote:

> Hi Fei,
>
> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
>
> https://github.com/apache/spark/blob/branch-1.6/core/
> src/main/scala/org/apache/spark/rdd/RDD.scala#L395
>
> coalesce is mostly used for reducing the number of partitions before
> writing to HDFS, but it might still be a narrow dependency (satisfying your
> requirements) if you increase the # of partitions.
>
> Best,
> Anastasios
>
> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote:
>
>> Dear all,
>>
>> I want to equally divide a RDD partition into two partitions. That means,
>> the first half of elements in the partition will create a new partition,
>> and the second half of elements in the partition will generate another new
>> partition. But the two new partitions are required to be at the same node
>> with their parent partition, which can help get high data locality.
>>
>> Is there anyone who knows how to implement it or any hints for it?
>>
>> Thanks in advance,
>> Fei
>>
>>
>
>
> --
> -- Anastasios Zouzias
> <a...@zurich.ibm.com>
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Fei Hu
Hi Rishi,

Thanks for your reply! The RDD has 24 partitions, and the cluster has a
master node + 24 computing nodes (12 cores per node). Each node will have a
partition, and I want to split each partition to two sub-partitions on the
same node to improve the parallelism and achieve high data locality.

Thanks,
Fei


On Sun, Jan 15, 2017 at 2:33 AM, Rishi Yadav <ri...@infoobjects.com> wrote:

> Can you provide some more details:
> 1. How many partitions does RDD have
> 2. How big is the cluster
> On Sat, Jan 14, 2017 at 3:59 PM Fei Hu <hufe...@gmail.com> wrote:
>
>> Dear all,
>>
>> I want to equally divide a RDD partition into two partitions. That means,
>> the first half of elements in the partition will create a new partition,
>> and the second half of elements in the partition will generate another new
>> partition. But the two new partitions are required to be at the same node
>> with their parent partition, which can help get high data locality.
>>
>> Is there anyone who knows how to implement it or any hints for it?
>>
>> Thanks in advance,
>> Fei
>>
>>


Equally split a RDD partition into two partition at the same node

2017-01-14 Thread Fei Hu
Dear all,

I want to equally divide a RDD partition into two partitions. That means,
the first half of elements in the partition will create a new partition,
and the second half of elements in the partition will generate another new
partition. But the two new partitions are required to be at the same node
with their parent partition, which can help get high data locality.

Is there anyone who knows how to implement it or any hints for it?

Thanks in advance,
Fei


Re: RDD Location

2016-12-30 Thread Fei Hu
It will be very appreciated if you can give more details about why runJob
function could not be called in getPreferredLocations()

In the NewHadoopRDD class and HadoopRDD class, they get the location
information from the inputSplit. But there may be an issue in NewHadoopRDD,
because it generates all of the inputSplits on the master node, which means
I can only use a single node to generate and filter the inputSplits even if
the number of inputSplits is huge. Will it be a performance bottleneck?

Thanks,
Fei





On Fri, Dec 30, 2016 at 10:41 PM, Sun Rui <sunrise_...@163.com> wrote:

> You can’t call runJob inside getPreferredLocations().
> You can take a look at the source  code of HadoopRDD to help you implement 
> getPreferredLocations()
> appropriately.
>
> On Dec 31, 2016, at 09:48, Fei Hu <hufe...@gmail.com> wrote:
>
> That is a good idea.
>
> I tried add the following code to get getPreferredLocations() function:
>
> val results: Array[Array[DataChunkPartition]] = context.runJob(
>   partitionsRDD, (context: TaskContext, partIter:
> Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true)
>
> But it seems to be suspended when executing this function. But if I move
> the code to other places, like the main() function, it runs well.
>
> What is the reason for it?
>
> Thanks,
> Fei
>
> On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui <sunrise_...@163.com> wrote:
>
>> Maybe you can create your own subclass of RDD and override the
>> getPreferredLocations() to implement the logic of dynamic changing of the
>> locations.
>> > On Dec 30, 2016, at 12:06, Fei Hu <hufe...@gmail.com> wrote:
>> >
>> > Dear all,
>> >
>> > Is there any way to change the host location for a certain partition of
>> RDD?
>> >
>> > "protected def getPreferredLocations(split: Partition)" can be used to
>> initialize the location, but how to change it after the initialization?
>> >
>> >
>> > Thanks,
>> > Fei
>> >
>> >
>>
>>
>>
>
>


context.runJob() was suspended in getPreferredLocations() function

2016-12-30 Thread Fei Hu
Dear all,

I tried to customize my own RDD. In the getPreferredLocations() function, I
used the following code to query anonter RDD, which was used as an input to
initialize this customized RDD:

   * val results: Array[Array[DataChunkPartition]] =
context.runJob(partitionsRDD, (context: TaskContext, partIter:
Iterator[DataChunkPartition]) => partIter.toArray, partitions, allowLocal =
true)*

The problem is that when executing the above code, the task seemed to be
suspended. I mean the job just stopped at this code, but no errors and no
outputs.

What is the reason for it?

Thanks,
Fei


Re: RDD Location

2016-12-30 Thread Fei Hu
That is a good idea.

I tried add the following code to get getPreferredLocations() function:

val results: Array[Array[DataChunkPartition]] = context.runJob(
  partitionsRDD, (context: TaskContext, partIter:
Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true)

But it seems to be suspended when executing this function. But if I move
the code to other places, like the main() function, it runs well.

What is the reason for it?

Thanks,
Fei

On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui <sunrise_...@163.com> wrote:

> Maybe you can create your own subclass of RDD and override the
> getPreferredLocations() to implement the logic of dynamic changing of the
> locations.
> > On Dec 30, 2016, at 12:06, Fei Hu <hufe...@gmail.com> wrote:
> >
> > Dear all,
> >
> > Is there any way to change the host location for a certain partition of
> RDD?
> >
> > "protected def getPreferredLocations(split: Partition)" can be used to
> initialize the location, but how to change it after the initialization?
> >
> >
> > Thanks,
> > Fei
> >
> >
>
>
>


RDD Location

2016-12-29 Thread Fei Hu
Dear all,

Is there any way to change the host location for a certain partition of RDD?

"protected def getPreferredLocations(split: Partition)" can be used to
initialize the location, but how to change it after the initialization?


Thanks,
Fei


Kryo on Zeppelin

2016-10-10 Thread Fei Hu
Hi All,

I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version
1.5.0). I customized the Spark interpreter to use org.apache.spark.
serializer.KryoSerializer as spark.serializer. And in the dependency I
added Kyro-3.0.3 as following:
 com.esotericsoftware:kryo:3.0.3


When I wrote the scala notebook and run the program, I got the following
errors. But If I compiled these code as jars, and use spark-submit to run
it on the cluster, it worked well without errors.

WARN [2016-10-10 23:43:40,801] ({task-result-getter-1}
Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9,
svr-A3-A-U20): java.io.EOFException

at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:196)

at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(
TorrentBroadcast.scala:217)

at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)

at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
TorrentBroadcast.scala:165)

at org.apache.spark.broadcast.TorrentBroadcast._value$
lzycompute(TorrentBroadcast.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast._value(
TorrentBroadcast.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast.getValue(
TorrentBroadcast.scala:88)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


There were also some errors when I run the Zeppelin Tutorial:

Caused by: java.io.IOException: java.lang.NullPointerException

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

at org.apache.spark.rdd.ParallelCollectionPartition.readObject(
ParallelCollectionRDD.scala:70)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(
ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:1900)

at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1351)

at java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:72)

at org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:98)

at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:194)

... 3 more

Caused by: java.lang.NullPointerException

at com.twitter.chill.WrappedArraySerializer.read(
WrappedArraySerializer.scala:38)

at com.twitter.chill.WrappedArraySerializer.read(
WrappedArraySerializer.scala:23)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:192)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(
ParallelCollectionRDD.scala:80)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(
ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.deserializeViaNestedStream(
Utils.scala:142)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)

Is there anyone knowing why it happended?

Thanks in advance,
Fei


[no subject]

2016-10-10 Thread Fei Hu
Hi All,

I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version
1.5.0). I customized the Spark interpreter to use
org.apache.spark.serializer.KryoSerializer as spark.serializer. And in the
dependency I added Kyro-3.0.3 as following:
 com.esotericsoftware:kryo:3.0.3


When I wrote the scala notebook and run the program, I got the following
errors. But If I compiled these code as jars, and use spark-submit to run
it on the cluster, it worked well without errors.

WARN [2016-10-10 23:43:40,801] ({task-result-getter-1}
Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9,
svr-A3-A-U20): java.io.EOFException

at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:196)

at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)

at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)

at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)

at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


There were also some errors when I run the Zeppelin Tutorial:

Caused by: java.io.IOException: java.lang.NullPointerException

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)

at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)

... 3 more

Caused by: java.lang.NullPointerException

at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:38)

at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)

at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)

at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)

at
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:142)

at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)

Is there anyone knowing why it happended?

Thanks in advance,
Fei


Spark application Runtime Measurement

2016-07-09 Thread Fei Hu
Dear all,

I have a question about how to measure the runtime for a Spak application.
Here is an example:


   - On the Spark UI: the total duration time is 2.0 minutes = 120 seconds
   as following

[image: Screen Shot 2016-07-09 at 11.45.44 PM.png]

   - However, when I check the jobs launched by the application, the time
   is 13s + 0.8s + 4s = 17.8 seconds, which is much less than 120 seconds. I
   am not sure which time I should choose to measure the performance of the
   Spark application.

[image: Screen Shot 2016-07-09 at 11.48.26 PM.png]

   - I also check the event timeline as following. There is a big gap
   between the second job and the third job. I do not know what happened
   during that gap.

[image: Screen Shot 2016-07-09 at 11.53.29 PM.png]

Is there anyone who can help explain which time is the exact time to
measure the performance of a Spark application.

Thanks in advance,
Fei