Also regarding storage policy, as storage policy is set to MEMORY_ONLY_2,
if rdd cannot be fitted in memory will it be flushed to disk or not?


On Fri, Feb 14, 2014 at 1:56 PM, Sourav Chandra <
[email protected]> wrote:

> I increased the reduceByKeyAndWindow numPartitions parameter to 12 but
> still it shows 3/4 tasks for combineByKey stage, though i can see foreach
> stage has 12 tasks.
>
> I am still unable to understand why it is so.
>
> Also I find some apply stage in the UI. I have attached both stage details
> and stage ui snapshots for your reference. Can you explain what this stage
> is?
>
> Thanks,
> Sourav
>
>
> On Fri, Feb 14, 2014 at 1:20 PM, Tathagata Das <
> [email protected]> wrote:
>
>>
>>
>>
>> On Thu, Feb 13, 2014 at 10:27 PM, Sourav Chandra <
>> [email protected]> wrote:
>>
>>> 1. We are not setting any storage explicitly, hence I assume its using
>>> defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a
>>> NetworkStream it should be replicated. Correct me if I am wrong
>>>
>> No it wont. Use MEMORY_ONLY_2.
>>
>>
>>> 2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per
>>> worker)
>>>
>> You have to play around with this. But it should be comparable to the
>> number of cores in the cluster. But if this number if too big, then
>> performance may go down. So there is a sweet spot, you have to figure it
>> out by testing.
>>
>>
>>>
>>> 3. Batchduration for streaming context is set to 1 sec. I tried setting
>>> to 500 milli but did not help
>>>
>>> In the ui, only 2 types of stages are present - combineByKey and
>>> foreach. And combineByKey is taking much time compared to foreach
>>>
>>> By looking at stage ui as you suggested, i can see though foreach stage
>>> has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are
>>> per core which implies combineByKey is not utilizinfg all cores.
>>> What could be the reason for this?
>>>
>>>  I have attached the stage ui with sorted duration column
>>>
>>> Well it is clear that the combineByKey is taking the most amount of time
>> and 7 seconds. So you need to increase the number of reducers in the
>> reduceByKeyAndWindow operation. That should distribute the computation more
>> to use all the cores, and therefore speed up the processing of each batch.
>> However you have to set the batch interval such that batch interval >
>> processing time of each batch. Otherwise, the system is not able to process
>> as fast as batches of data are accumulating, so it is constantly getting
>> backlogged. So try increasing the number of reducers as well as increasing
>> the batch interval.
>>
>> Also you can monitor the batch processing times and end-to-end delay
>> using the StreamingListener interface (see
>> StreamingContext.addStreamingListener in Spark 0.9). if the batch interval
>> is not large enough you will find that the the latency found with a
>> streaming listener will keep growing.
>>
>> Hope this helps.
>>
>> TD
>>
>>
>>>
>>>
>>>
>>>
>>> On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das <
>>> [email protected]> wrote:
>>>
>>>> Can you tell me more about the structure of your program? As in
>>>> 1) What storage levels are you using in the input stream?
>>>> 2) How many reducers are using for the reduceByKeyAndWindow?
>>>> 3) Batch interval and processing times seen with one machine vs two
>>>> machines.
>>>>
>>>> A good place to start debugging is the Spark web ui for the Spark
>>>> streaming application. It should running on the master at port 4040. There
>>>> if you look at the stage you should see patterns of stages repeatedly. You
>>>> can figure out the number of tasks in each stage, which stage is taking the
>>>> most amount of time (and is therefore the bottleneck) etc. You can drill
>>>> down and see where the tasks are running, is it using the 32 slots in the
>>>> new machine or not.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks TD.
>>>>>
>>>>> One more question:
>>>>>
>>>>> We are building real time analytics using spark streaming - We read
>>>>> from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter)
>>>>> and then save to Cassandra (using DStream.foreachRDD).
>>>>> Initially I used a machine with 32 cores, 32 GB and performed load
>>>>> testing. with 1 master and 1 worker. in the same box. Later I added one
>>>>> more box and launched worker on that box (32 core 16GB). I set
>>>>> spark.executor.memory=10G in driver program
>>>>>
>>>>> I expected the performance should increase linearly as mentioned in
>>>>> spark streaming video but it did not help.
>>>>>
>>>>> Can you please explain why it is so? Also how can we increase?
>>>>>
>>>>> Thanks,
>>>>> Sourav
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Answers inline. Hope these answer your questions.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> HI,
>>>>>>>
>>>>>>> I have couple of questions:
>>>>>>>
>>>>>>> 1. While going through the spark-streaming code, I found out there
>>>>>>> is one configuration in JobScheduler/Generator
>>>>>>> (spark.streaming.concurrentJobs) which is set to 1. There is no
>>>>>>> documentation for this parameter. After setting this to 1000 in driver
>>>>>>> program, our streaming application's performance is improved.
>>>>>>>
>>>>>>
>>>>>> That is a parameter that allows Spark Stremaing to launch multiple
>>>>>> Spark jobs simultaneously. While it can improve the performance in many
>>>>>> scenarios (as it has in your case), it can actually increase the 
>>>>>> processing
>>>>>> time of each batch and increase end-to-end latency in certain scenarios. 
>>>>>> So
>>>>>> it is something that needs to be used with caution. That said, we should
>>>>>> have definitely exposed it in the documentation.
>>>>>>
>>>>>>
>>>>>>> What is this variable used for? Is it safe to use/tweak this
>>>>>>> parameter?
>>>>>>>
>>>>>>> 2. Can someone explain the usage of MapOutputTracker, BlockManager
>>>>>>> component. I have gone through the youtube video of Matei about spark
>>>>>>> internals but this was not covered in detail.
>>>>>>>
>>>>>>
>>>>>> I am not sure if there is a detailed document anywhere that explains
>>>>>> but I can give you a high level overview of the both.
>>>>>>
>>>>>> BlockManager is like a distributed key-value store for large blobs
>>>>>> (called blocks) of data. It has a master-worker architecture (loosely it 
>>>>>> is
>>>>>> like the HDFS file system) where the BlockManager at the workers store 
>>>>>> the
>>>>>> data blocks and BlockManagerMaster stores the metadata for what blocks 
>>>>>> are
>>>>>> stored where. All the cached RDD's partitions and shuffle data are stored
>>>>>> and managed by the BlockManager. It also transfers the blocks between the
>>>>>> workers as needed (shuffles etc all happen through the block manager).
>>>>>> Specifically for spark streaming, the data received from outside is 
>>>>>> stored
>>>>>> in the BlockManager of the worker nodes, and the IDs of the blocks are
>>>>>> reported to the BlockManagerMaster.
>>>>>>
>>>>>> MapOutputTrackers is a simpler component that keeps track of the
>>>>>> location of the output of the map stage, so that workers running the 
>>>>>> reduce
>>>>>> stage knows which machines to pull the data from. That also has the
>>>>>> master-worker component - master has the full knowledge of the mapoutput
>>>>>> and the worker component on-demand pulls that knowledge from the master
>>>>>> component when the reduce tasks are executed on the worker.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> 3. Can someone explain the usage of cache w.r.t spark streaming? For
>>>>>>> example if we do stream.cache(), will the cache remain constant with all
>>>>>>> the partitions of RDDs present across the nodes for that stream, OR 
>>>>>>> will it
>>>>>>> be regularly updated as in while new batch is coming?
>>>>>>>
>>>>>>> If you call DStream.persist (persist == cache = true), then all RDDs
>>>>>> generated by the DStream will be persisted in the cache (in the
>>>>>> BlockManager). As new RDDs are generated and persisted, old RDDs from the
>>>>>> same DStream will fall out of memory. either by LRU or explicitly if
>>>>>> spark.streaming.unpersist is set to true.
>>>>>>
>>>>>>
>>>>>>> Thanks,
>>>>>>> --
>>>>>>>
>>>>>>> Sourav Chandra
>>>>>>>
>>>>>>> Senior Software Engineer
>>>>>>>
>>>>>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>>>>>
>>>>>>> [email protected]
>>>>>>>
>>>>>>> o: +91 80 4121 8723
>>>>>>>
>>>>>>> m: +91 988 699 3746
>>>>>>>
>>>>>>> skype: sourav.chandra
>>>>>>>
>>>>>>> Livestream
>>>>>>>
>>>>>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main,
>>>>>>> 3rd Block, Koramangala Industrial Area,
>>>>>>>
>>>>>>> Bangalore 560034
>>>>>>>
>>>>>>> www.livestream.com
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Sourav Chandra
>>>>>
>>>>> Senior Software Engineer
>>>>>
>>>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>>>
>>>>> [email protected]
>>>>>
>>>>> o: +91 80 4121 8723
>>>>>
>>>>> m: +91 988 699 3746
>>>>>
>>>>> skype: sourav.chandra
>>>>>
>>>>> Livestream
>>>>>
>>>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main,
>>>>> 3rd Block, Koramangala Industrial Area,
>>>>>
>>>>> Bangalore 560034
>>>>>
>>>>> www.livestream.com
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Sourav Chandra
>>>
>>> Senior Software Engineer
>>>
>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>
>>> [email protected]
>>>
>>> o: +91 80 4121 8723
>>>
>>> m: +91 988 699 3746
>>>
>>> skype: sourav.chandra
>>>
>>> Livestream
>>>
>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>>> Block, Koramangala Industrial Area,
>>>
>>> Bangalore 560034
>>>
>>> www.livestream.com
>>>
>>
>>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> [email protected]
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[email protected]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply via email to