Re: How to address seemingly low core utilization on a spark workload?

2018-11-15 Thread Shahbaz
30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1
,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it
to default ,see which one gives best performance,after you do that ,see how
cores are being used?

Regards,
Shahbaz

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <
vitaliy.pisa...@biocatch.com> wrote:

> Oh, regarding and shuffle.partitions being 30k, don't know. I inherited
> the workload from an engineer that is no longer around and am trying to
> make sense of things in general.
>
> On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <
> vitaliy.pisa...@biocatch.com> wrote:
>
>> The quest is dual:
>>
>>
>>- Increase utilisation- because cores cost money and I want to make
>>sure that if I fully utilise what I pay for. This is very blunt of corse,
>>because there is always i/o and at least some degree of skew. Bottom line
>>is do the same thing over the same time but with fewer (but better
>>utilised) resources.
>>- Reduce runtime by increasing parallelism.
>>
>> While not the same, I am looking at these as two sides of the same coin.
>>
>>
>>
>>
>>
>> On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <
>> jthak...@conversantmedia.com> wrote:
>>
>>> For that little data, I find spark.sql.shuffle.partitions = 3 to be
>>> very high.
>>>
>>> Any reason for that high value?
>>>
>>>
>>>
>>> Do you have a baseline observation with the default value?
>>>
>>>
>>>
>>> Also, enabling the jobgroup and job info through the API and observing
>>> through the UI will help you understand the code snippets when you have low
>>> utilization.
>>>
>>>
>>>
>>> Finally, high utilization does not equate to high efficiency.
>>>
>>> Its very likely that for your workload, you may only need 16-128
>>> executors.
>>>
>>> I would suggest getting the partition count for the various
>>> datasets/dataframes/rdds in your code by using
>>>
>>>
>>>
>>> dataset.rdd. getNumPartitions
>>>
>>>
>>>
>>> I would also suggest doing a number of tests with different number of
>>> executors too.
>>>
>>>
>>>
>>> But coming back to the objective behind your quest – are you trying to
>>> maximize utilization hoping that by having high parallelism will reduce
>>> your total runtime?
>>>
>>>
>>>
>>>
>>>
>>> *From: *Vitaliy Pisarev 
>>> *Date: *Thursday, November 15, 2018 at 10:07 AM
>>> *To: *
>>> *Cc: *user , David Markovitz <
>>> dudu.markov...@microsoft.com>
>>> *Subject: *Re: How to address seemingly low core utilization on a spark
>>> workload?
>>>
>>>
>>>
>>> I am working with parquets and the metadata reading there is quite fast
>>> as there are at most 16 files (a couple of gigs each).
>>>
>>>
>>>
>>> I find it very hard to answer the question: "how many partitions do you
>>> have?", many spark operations do not preserve partitioning and I have a lot
>>> of filtering and grouping going on.
>>>
>>> What I *can* say is that I specified spark.sql.shuffle.partitions to
>>> 30,000.
>>>
>>>
>>>
>>> I am not worried that there are not enough partitions to keep the cores
>>> working. Having said that I do see that the high utilisation correlates
>>> heavily with shuffle read/write. Whereas low utilisation correlates with no
>>> shuffling.
>>>
>>> This leads me to the conclusion that compared to the amount of
>>> shuffling, the cluster is doing very little work.
>>>
>>>
>>>
>>> Question is what can I do about it.
>>>
>>>
>>>
>>> On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <
>>> jthak...@conversantmedia.com> wrote:
>>>
>>> Can you shed more light on what kind of processing you are doing?
>>>
>>>
>>>
>>> One common pattern that I have seen for active core/executor utilization
>>> dropping to zero is while reading ORC data and the driver seems (I think)
>>> to be doing schema validation.
>>>
>>> In my case I would have hundreds of thousands of ORC data files and
>>> there is dead silence for about 1-2 hours.
>>>
>>> I have tried providing a schema and

Re: How to increase the parallelism of Spark Streaming application?

2018-11-07 Thread Shahbaz
Hi ,

   - Do you have adequate CPU cores allocated to handle increased
   partitions ,generally if you have Kafka partitions >=(greater than or equal
   to) CPU Cores Total (Number of Executor Instances * Per Executor Core)
   ,gives increased task parallelism for reader phase.
   - However if you have too many partitions but not enough cores ,it would
   eventually slow down the reader (Ex: 100 Partitions and only 20 Total
   Cores).
   - Additionally ,the next set of transformation will have there own
   partitions ,if its involving  shuffle ,sq.shuffle.partitions then defines
   next level of parallelism ,if you are not having any data skew,then you
   should get good performance.


Regards,
Shahbaz

On Wed, Nov 7, 2018 at 12:58 PM JF Chen  wrote:

> I have a Spark Streaming application which reads data from kafka and save
> the the transformation result to hdfs.
> My original partition number of kafka topic is 8, and repartition the data
> to 100 to increase the parallelism of spark job.
> Now I am wondering if I increase the kafka partition number to 100 instead
> of setting repartition to 100, will the performance be enhanced? (I know
> repartition action cost a lot cpu resource)
> If I set the kafka partition number to 100, does it have any negative
> efficiency?
> I just have one production environment so it's not convenient for me to do
> the test
>
> Thanks!
>
> Regard,
> Junfeng Chen
>


Re: Spark Streaming heap space out of memory

2016-05-30 Thread Shahbaz
Hi Christian can you please try if 30seconds works for your case .I think
your batches are getting queued up .Regards Shahbaz

On Tuesday 31 May 2016, Dancuart, Christian <christian.dancu...@rbc.com>
wrote:

> While it has heap space, batches run well below 15 seconds.
>
>
>
> Once it starts to run out of space, processing time takes about 1.5
> minutes. Scheduling delay is around 4 minutes and total delay around 5.5
> minutes. I usually shut it down at that point.
>
>
>
> The number of stages (and pending stages) does seem to be quite high and
> increases over time.
>
>
>
> 4584foreachRDD at HDFSPersistence.java:52 2016/05/30 16:23:52  1.9
> min36/36 (4964 skipped) 285/285 (28026 skipped)
>
> 4586transformToPair at SampleCalculator.java:88  2016/05/30
> 16:25:02  0.2 s  1/1   4/4
>
> 4585(Unknown Stage Name) 2016/05/30 16:23:52  1.2
> min1/1   1/1
>
> 4582(Unknown Stage Name) 2016/05/30 16:21:51  48 s 1/1
> (4063 skipped)  12/12 (22716 skipped)
>
> 4583(Unknown Stage Name) 2016/05/30 16:21:51  48 s
> 1/1   1/1
>
> 4580(Unknown Stage Name) 2016/05/30 16:16:38  4.0
> min36/36 (4879 skipped)285/285 (27546 skipped)
>
> 4581(Unknown Stage Name) 2016/05/30 16:16:38  0.1 s
> 1/1   4/4
>
> 4579(Unknown Stage Name) 2016/05/30 16:15:53  45 s
> 1/1   1/1
>
> 4578(Unknown Stage Name) 2016/05/30 16:14:38  1.3
> min1/1 (3993 skipped)  12/12 (22326 skipped)
>
> 4577(Unknown Stage Name) 2016/05/30 16:14:37  0.8 s
> 1/1   1/1Is this what you mean by pending stages?
>
>
>
> I have taken a few heap dumps but I’m not sure what I am looking at for
> the problematic classes.
>
>
>
> *From:* Shahbaz [mailto:shahzadh...@gmail.com
> <javascript:_e(%7B%7D,'cvml','shahzadh...@gmail.com');>]
> *Sent:* 2016, May, 30 3:25 PM
> *To:* Dancuart, Christian
> *Cc:* user
> *Subject:* Re: Spark Streaming heap space out of memory
>
>
>
> Hi Christian,
>
>
>
>- What is the processing time of each of your Batch,is it exceeding 15
>seconds.
>- How many jobs are queued.
>- Can you take a heap dump and see which objects are occupying the
>heap.
>
>
>
> Regards,
>
> Shahbaz
>
>
>
>
>
> On Tue, May 31, 2016 at 12:21 AM, christian.dancu...@rbc.com
> <javascript:_e(%7B%7D,'cvml','christian.dancu...@rbc.com');> <
> christian.dancu...@rbc.com
> <javascript:_e(%7B%7D,'cvml','christian.dancu...@rbc.com');>> wrote:
>
> Hi All,
>
> We have a spark streaming v1.4/java 8 application that slows down and
> eventually runs out of heap space. The less driver memory, the faster it
> happens.
>
> Appended is our spark configuration and a snapshot of the of heap taken
> using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
> keep growing as we observe. We also tried to use G1GC, but it acts the
> same.
>
> Our dependency graph contains multiple updateStateByKey() calls. For each,
> we explicitly set the checkpoint interval to 240 seconds.
>
> We have our batch interval set to 15 seconds; with no delays at the start
> of
> the process.
>
> Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
> spark.streaming.minRememberDuration=180s
> spark.ui.showConsoleProgress=false
> spark.streaming.receiver.writeAheadLog.enable=true
> spark.streaming.unpersist=true
> spark.streaming.stopGracefullyOnShutdown=true
> spark.streaming.ui.retainedBatches=10
> spark.ui.retainedJobs=10
> spark.ui.retainedStages=10
> spark.worker.ui.retainedExecutors=10
> spark.worker.ui.retainedDrivers=10
> spark.sql.ui.retainedExecutions=10
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer.max=128m
>
> num #instances #bytes  class name
> --
>1:   8828200  565004800  org.apache.spark.storage.RDDInfo
>2:  20794893  499077432  scala.collection.immutable.$colon$colon
>3:   9646097  459928736  [C
>4:   9644398  231465552  java.lang.String
>5:  12760625  20417  java.lang.Integer
>6: 21326  98632  [B
>7:556959   44661232  [Lscala.collection.mutable.HashEntry;
>8:   1179788   37753216
> java.util.concurrent.ConcurrentHashMap$Node
>9:   1169264   37416448  java.util.Hashtable$Entry
>   10:552707   30951592  org.apache.spark.schedu

Re: Spark Streaming heap space out of memory

2016-05-30 Thread Shahbaz
Hi Christian,


   - What is the processing time of each of your Batch,is it exceeding 15
   seconds.
   - How many jobs are queued.
   - Can you take a heap dump and see which objects are occupying the heap.


Regards,
Shahbaz


On Tue, May 31, 2016 at 12:21 AM, christian.dancu...@rbc.com <
christian.dancu...@rbc.com> wrote:

> Hi All,
>
> We have a spark streaming v1.4/java 8 application that slows down and
> eventually runs out of heap space. The less driver memory, the faster it
> happens.
>
> Appended is our spark configuration and a snapshot of the of heap taken
> using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
> keep growing as we observe. We also tried to use G1GC, but it acts the
> same.
>
> Our dependency graph contains multiple updateStateByKey() calls. For each,
> we explicitly set the checkpoint interval to 240 seconds.
>
> We have our batch interval set to 15 seconds; with no delays at the start
> of
> the process.
>
> Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
> spark.streaming.minRememberDuration=180s
> spark.ui.showConsoleProgress=false
> spark.streaming.receiver.writeAheadLog.enable=true
> spark.streaming.unpersist=true
> spark.streaming.stopGracefullyOnShutdown=true
> spark.streaming.ui.retainedBatches=10
> spark.ui.retainedJobs=10
> spark.ui.retainedStages=10
> spark.worker.ui.retainedExecutors=10
> spark.worker.ui.retainedDrivers=10
> spark.sql.ui.retainedExecutions=10
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer.max=128m
>
> num #instances #bytes  class name
> --
>1:   8828200  565004800  org.apache.spark.storage.RDDInfo
>2:  20794893  499077432  scala.collection.immutable.$colon$colon
>3:   9646097  459928736  [C
>4:   9644398  231465552  java.lang.String
>5:  12760625  20417  java.lang.Integer
>6: 21326  98632  [B
>7:556959   44661232  [Lscala.collection.mutable.HashEntry;
>8:   1179788   37753216
> java.util.concurrent.ConcurrentHashMap$Node
>9:   1169264   37416448  java.util.Hashtable$Entry
>   10:552707   30951592  org.apache.spark.scheduler.StageInfo
>   11:367107   23084712  [Ljava.lang.Object;
>   12:556948   22277920  scala.collection.mutable.HashMap
>   13:  2787   22145568
> [Ljava.util.concurrent.ConcurrentHashMap$Node;
>   14:116997   12167688  org.apache.spark.executor.TaskMetrics
>   15:3604258650200
> java.util.concurrent.LinkedBlockingQueue$Node
>   16:3604178650008
> org.apache.spark.deploy.history.yarn.HandleSparkEvent
>   17:  83328478088  [Ljava.util.Hashtable$Entry;
>   18:3510618425464  scala.collection.mutable.ArrayBuffer
>   19:1169638421336  org.apache.spark.scheduler.TaskInfo
>   20:4461367138176  scala.Some
>   21:2119685087232
> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>   22:1169634678520
> org.apache.spark.scheduler.SparkListenerTaskEnd
>   23:1076794307160
> org.apache.spark.executor.ShuffleWriteMetrics
>   24: 721624041072
> org.apache.spark.executor.ShuffleReadMetrics
>   25:1172233751136  scala.collection.mutable.ListBuffer
>   26: 814733258920  org.apache.spark.executor.InputMetrics
>   27:1259033021672  org.apache.spark.rdd.RDDOperationScope
>   28: 914552926560  java.util.HashMap$Node
>   29:892917776
> [Lscala.concurrent.forkjoin.ForkJoinTask;
>   30:1169572806968
> org.apache.spark.scheduler.SparkListenerTaskStart
>   31:  21222188568  [Lorg.apache.spark.scheduler.StageInfo;
>   32: 164111819816  java.lang.Class
>   33: 878621405792
> org.apache.spark.scheduler.SparkListenerUnpersistRDD
>   34: 22915 916600  org.apache.spark.storage.BlockStatus
>   35:  5887 895568  [Ljava.util.HashMap$Node;
>   36:   480 82
> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>   37:  7569 834968  [I
>   38:  9626 770080  org.apache.spark.rdd.MapPartitionsRDD
>   39: 31748 761952  java.lang.Long
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-heap-space-out-of-memory-tp27050.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Shahbaz
   - Do you happen to see how busy are the nodes in terms of CPU and how
   much heap each executor is allocated with.
   - If there is enough capacity ,you may want to increase number of cores
   per executor to 2 and do the needed heap tweaking.
   - How much time did it take to process 4M+ events (In Spark UI,you can
   look at Duration) column.
   - I believe Reader is Quite fast ,however Processing could be slower ,if
   you click on the Job,it gives you break down of execution,Result
   Serialization etc ,you may want to look at that and drive from there.


Regards,
Shahbaz

On Sun, Mar 6, 2016 at 9:26 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> I have 2 machines in my cluster with the below specifications:
> 128 GB RAM and 8 cores machine
>
> Regards,
> ~Vinti
>
> On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Thanks Supreeth and Shahbaz. I will try adding
>> spark.streaming.kafka.maxRatePerPartition.
>>
>> Hi Shahbaz,
>>
>> Please see comments, inline:
>>
>>
>>- Which version of Spark you are using. ==> *1.5.2*
>>- How big is the Kafka Cluster ==> *2 brokers*
>>- What is the Message Size and type.==>
>> *String, 9,550 bytes (around) *
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>>- What does your Spark Job looks like ==>
>>
>>
>>val messages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>>
>>
>>  val parsedStream = inputStream
>>.map(line => {
>>  val splitLines = line.split(",")
>>  (splitLines(1), splitLines.slice(2, 
>> splitLines.length).map((_.trim.toLong)))
>>})
>>
>>  val state: DStream[(String, Array[Long])] = 
>> parsedStream.updateStateByKey(
>>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>  prev.map(_ +: current).orElse(Some(current))
>>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
>> _).toArray).toOption)
>>})
>>  state.checkpoint(Duration(25000))
>>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>>  ssc
>>}
>>
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>  ==>
>>
>>
>> *yes, i had enabled it.*
>> Regards,
>> ~Vinti
>>
>> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>>- Which version of Spark you are using.
>>>- How big is the Kafka Cluster
>>>- What is the Message Size and type.
>>>- How big is the spark cluster (How many executors ,How many cores
>>>Per Executor)
>>>- What does your Spark Job looks like .
>>>
>>> spark.streaming.backpressure.enabled set it to true and try?
>>>
>>>
>>> Regards,
>>> Shahbaz
>>> +91-9986850670
>>>
>>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth@gmail.com>
>>> wrote:
>>>
>>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>>> control the number of messages read from Kafka per partition on the spark
>>>> streaming consumer.
>>>>
>>>> -S
>>>>
>>>>
>>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am trying to figure out why my kafka+spark job is running slow. I
>>>> found that spark is consuming all the messages out of kafka into a single
>>>> batch itself and not sending any messages to the other batches.
>>>>
>>>> 2016/03/05 21:57:05
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000>
>>>> 0 events - - queued 2016/03/05 21:57:00
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724382>
>>>> 0 events - - queued 2016/03/05 21:56:55
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000>
>>>> 0 events - - queued 2016/03/05 21:56:50
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724381>
>>>> 0 events - - queued 2016/03/05 21:56:45
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000>
>>>> 0 events - - queued 2016/03/05 21:56:40
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=145724380>
>>>> 4039573 events 6 ms - processing
>>>>
>>>> Does anyone know how this behavior can be changed so that the number of
>>>> messages are load balanced across all the batches?
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>>
>>>
>>
>