Kill spark executor when spark runs specific stage

2018-07-04 Thread Serega Sheypak
Hi, I'm running spark on YARN. My code is very simple. I want to kill one
executor when "data.repartition(10)" is executed. Ho can I do it in easy
way?


val data = sc.sequenceFile[NullWritable, BytesWritable](inputPath)
.map { case (key, value) =>
Data.fromBytes(value)
}

process = data.repartition(10) // kill one executor here
process.map { d =>
val data = d.toByteArray
(new AvroKey(ByteBuffer.wrap(data)), NullWritable.get())
}
.saveAsNewAPIHadoopFile[AvroKeyOutputFormat[ByteBuffer]](outputPath)


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-04 Thread Prem Sure
try .pipe(.py) on RDD

Thanks,
Prem

On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri 
wrote:

> Can someone please suggest me , thanks
>
> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
> wrote:
>
>> Hello Dear Spark User / Dev,
>>
>> I would like to pass Python user defined function to Spark Job developed
>> using Scala and return value of that function would be returned to DF /
>> Dataset API.
>>
>> Can someone please guide me, which would be best approach to do this.
>> Python function would be mostly transformation function. Also would like to
>> pass Java Function as a String to Spark / Scala job and it applies to RDD /
>> Data Frame and should return RDD / Data Frame.
>>
>> Thank you.
>>
>>
>>
>>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-04 Thread Chetan Khatri
Can someone please suggest me , thanks

On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
wrote:

> Hello Dear Spark User / Dev,
>
> I would like to pass Python user defined function to Spark Job developed
> using Scala and return value of that function would be returned to DF /
> Dataset API.
>
> Can someone please guide me, which would be best approach to do this.
> Python function would be mostly transformation function. Also would like to
> pass Java Function as a String to Spark / Scala job and it applies to RDD /
> Data Frame and should return RDD / Data Frame.
>
> Thank you.
>
>
>
>


Re: Inferring Data driven Spark parameters

2018-07-04 Thread Mich Talebzadeh
Hi Aakash,

For clarification are you running this in Yarn client mode or standalone?

How much total yarn memory is available?

>From my experience for a bigger cluster I found the following incremental
settings useful (CDH 5.9, Yarn client) so you can scale yours

[1] - 576GB

--num-executors 24

--executor-memory 21G

--executor-cores 4
--conf spark.yarn.executor.memoryOverhead=3000



[2] - 672GB

--num-executors 28

--executor-memory 21G

--executor-cores 4
--conf spark.yarn.executor.memoryOverhead=3000



[3] - 786GB

--num-executors 32

--executor-memory 21G

--executor-cores 4
--conf spark.yarn.executor.memoryOverhead=3000



[4] - 864GB

--num-executors 32

--executor-memory 21G

--executor-cores 4
--conf spark.yarn.executor.memoryOverhead=3000



HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 3 Jul 2018 at 08:34, Aakash Basu  wrote:

> Hi,
>
> Cluster - 5 node (1 Driver and 4 workers)
> Driver Config: 16 cores, 32 GB RAM
> Worker Config: 8 cores, 16 GB RAM
>
> I'm using the below parameters from which I know the first chunk is
> cluster dependent and the second chunk is data/code dependent.
>
> --num-executors 4
> --executor-cores 5
> --executor-memory 10G
> --driver-cores 5
> --driver-memory 25G
>
>
> --conf spark.sql.shuffle.partitions=100
> --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>
> I've come upto these values depending on my R on the properties and the
> issues I faced and hence the handles.
>
> My ask here is -
>
> *1) How can I infer, using some formula or a code, to calculate the below
> chunk dependent on the data/code?*
> *2) What are the other usable properties/configurations which I can use to
> shorten my job runtime?*
>
> Thanks,
> Aakash.
>


Re: Inferring Data driven Spark parameters

2018-07-04 Thread Prem Sure
Can you share the API that your jobs use.. just core RDDs or SQL or
DStreams..etc?
refer  recommendations from
https://spark.apache.org/docs/2.3.0/configuration.html for detailed
configurations.
Thanks,
Prem

On Wed, Jul 4, 2018 at 12:34 PM, Aakash Basu 
wrote:

> I do not want to change executor/driver cores/memory on the fly in a
> single Spark job, all I want is to make them cluster specific. So, I want
> to have a formulae, with which, depending on the size of driver and
> executor details, I can find out the values for them before submitting
> those details in the spark-submit.
>
> I, more or less know how to achieve the above as I've previously done that.
>
> All I need to do is, I want to tweak the other spark confs depending on
> the data. Is that possible? I mean (just an example), if I have 100+
> features, I want to double my default spark.driver.maxResultSize to 2G, and
> similarly for other configs. Can that be achieved by any means for a
> optimal run on that kind of dataset? If yes, can I?
>
> On Tue, Jul 3, 2018 at 6:28 PM, Vadim Semenov  wrote:
>
>> You can't change the executor/driver cores/memory on the fly once
>> you've already started a Spark Context.
>> On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu 
>> wrote:
>> >
>> > We aren't using Oozie or similar, moreover, the end to end job shall be
>> exactly the same, but the data will be extremely different (number of
>> continuous and categorical columns, vertical size, horizontal size, etc),
>> hence, if there would have been a calculation of the parameters to arrive
>> at a conclusion that we can simply get the data and derive the respective
>> configuration/parameters, it would be great.
>> >
>> > On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke 
>> wrote:
>> >>
>> >> Don’t do this in your job. Create for different types of jobs
>> different jobs and orchestrate them using oozie or similar.
>> >>
>> >> On 3. Jul 2018, at 09:34, Aakash Basu 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Cluster - 5 node (1 Driver and 4 workers)
>> >> Driver Config: 16 cores, 32 GB RAM
>> >> Worker Config: 8 cores, 16 GB RAM
>> >>
>> >> I'm using the below parameters from which I know the first chunk is
>> cluster dependent and the second chunk is data/code dependent.
>> >>
>> >> --num-executors 4
>> >> --executor-cores 5
>> >> --executor-memory 10G
>> >> --driver-cores 5
>> >> --driver-memory 25G
>> >>
>> >>
>> >> --conf spark.sql.shuffle.partitions=100
>> >> --conf spark.driver.maxResultSize=2G
>> >> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
>> >> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>> >>
>> >> I've come upto these values depending on my R on the properties and
>> the issues I faced and hence the handles.
>> >>
>> >> My ask here is -
>> >>
>> >> 1) How can I infer, using some formula or a code, to calculate the
>> below chunk dependent on the data/code?
>> >> 2) What are the other usable properties/configurations which I can use
>> to shorten my job runtime?
>> >>
>> >> Thanks,
>> >> Aakash.
>> >
>> >
>>
>>
>> --
>> Sent from my iPhone
>>
>
>


Re: [Spark Streaming MEMORY_ONLY] Understanding Dataflow

2018-07-04 Thread Prem Sure
Hoping below would help in clearing some..
executors dont have control to share the data among themselves except
sharing accumulators via driver's support.
Its all based on the data locality or remote nature, tasks/stages are
defined to perform which may result in shuffle.

On Wed, Jul 4, 2018 at 1:56 PM, thomas lavocat <
thomas.lavo...@univ-grenoble-alpes.fr> wrote:

> Hello,
>
> I have a question on Spark Dataflow. If I understand correctly, all
> received data is sent from the executor to the driver of the application
> prior to task creation.
>
> Then the task embeding the data transit from the driver to the executor in
> order to be processed.
>
> As executor cannot exchange data themselves, in a shuffle, data also
> transit to the driver.
>
> Is that correct ?
>
> Thomas
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark Streaming MEMORY_ONLY] Understanding Dataflow

2018-07-04 Thread thomas lavocat

Hello,

I have a question on Spark Dataflow. If I understand correctly, all 
received data is sent from the executor to the driver of the application 
prior to task creation.


Then the task embeding the data transit from the driver to the executor 
in order to be processed.


As executor cannot exchange data themselves, in a shuffle, data also 
transit to the driver.


Is that correct ?

Thomas


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Inferring Data driven Spark parameters

2018-07-04 Thread Aakash Basu
I do not want to change executor/driver cores/memory on the fly in a single
Spark job, all I want is to make them cluster specific. So, I want to have
a formulae, with which, depending on the size of driver and executor
details, I can find out the values for them before submitting those details
in the spark-submit.

I, more or less know how to achieve the above as I've previously done that.

All I need to do is, I want to tweak the other spark confs depending on the
data. Is that possible? I mean (just an example), if I have 100+ features,
I want to double my default spark.driver.maxResultSize to 2G, and similarly
for other configs. Can that be achieved by any means for a optimal run on
that kind of dataset? If yes, can I?

On Tue, Jul 3, 2018 at 6:28 PM, Vadim Semenov  wrote:

> You can't change the executor/driver cores/memory on the fly once
> you've already started a Spark Context.
> On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu 
> wrote:
> >
> > We aren't using Oozie or similar, moreover, the end to end job shall be
> exactly the same, but the data will be extremely different (number of
> continuous and categorical columns, vertical size, horizontal size, etc),
> hence, if there would have been a calculation of the parameters to arrive
> at a conclusion that we can simply get the data and derive the respective
> configuration/parameters, it would be great.
> >
> > On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke 
> wrote:
> >>
> >> Don’t do this in your job. Create for different types of jobs different
> jobs and orchestrate them using oozie or similar.
> >>
> >> On 3. Jul 2018, at 09:34, Aakash Basu 
> wrote:
> >>
> >> Hi,
> >>
> >> Cluster - 5 node (1 Driver and 4 workers)
> >> Driver Config: 16 cores, 32 GB RAM
> >> Worker Config: 8 cores, 16 GB RAM
> >>
> >> I'm using the below parameters from which I know the first chunk is
> cluster dependent and the second chunk is data/code dependent.
> >>
> >> --num-executors 4
> >> --executor-cores 5
> >> --executor-memory 10G
> >> --driver-cores 5
> >> --driver-memory 25G
> >>
> >>
> >> --conf spark.sql.shuffle.partitions=100
> >> --conf spark.driver.maxResultSize=2G
> >> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
> >> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
> >>
> >> I've come upto these values depending on my R on the properties and
> the issues I faced and hence the handles.
> >>
> >> My ask here is -
> >>
> >> 1) How can I infer, using some formula or a code, to calculate the
> below chunk dependent on the data/code?
> >> 2) What are the other usable properties/configurations which I can use
> to shorten my job runtime?
> >>
> >> Thanks,
> >> Aakash.
> >
> >
>
>
> --
> Sent from my iPhone
>