Also regarding storage policy, as storage policy is set to MEMORY_ONLY_2, if rdd cannot be fitted in memory will it be flushed to disk or not?
On Fri, Feb 14, 2014 at 1:56 PM, Sourav Chandra < [email protected]> wrote: > I increased the reduceByKeyAndWindow numPartitions parameter to 12 but > still it shows 3/4 tasks for combineByKey stage, though i can see foreach > stage has 12 tasks. > > I am still unable to understand why it is so. > > Also I find some apply stage in the UI. I have attached both stage details > and stage ui snapshots for your reference. Can you explain what this stage > is? > > Thanks, > Sourav > > > On Fri, Feb 14, 2014 at 1:20 PM, Tathagata Das < > [email protected]> wrote: > >> >> >> >> On Thu, Feb 13, 2014 at 10:27 PM, Sourav Chandra < >> [email protected]> wrote: >> >>> 1. We are not setting any storage explicitly, hence I assume its using >>> defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a >>> NetworkStream it should be replicated. Correct me if I am wrong >>> >> No it wont. Use MEMORY_ONLY_2. >> >> >>> 2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per >>> worker) >>> >> You have to play around with this. But it should be comparable to the >> number of cores in the cluster. But if this number if too big, then >> performance may go down. So there is a sweet spot, you have to figure it >> out by testing. >> >> >>> >>> 3. Batchduration for streaming context is set to 1 sec. I tried setting >>> to 500 milli but did not help >>> >>> In the ui, only 2 types of stages are present - combineByKey and >>> foreach. And combineByKey is taking much time compared to foreach >>> >>> By looking at stage ui as you suggested, i can see though foreach stage >>> has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are >>> per core which implies combineByKey is not utilizinfg all cores. >>> What could be the reason for this? >>> >>> I have attached the stage ui with sorted duration column >>> >>> Well it is clear that the combineByKey is taking the most amount of time >> and 7 seconds. So you need to increase the number of reducers in the >> reduceByKeyAndWindow operation. That should distribute the computation more >> to use all the cores, and therefore speed up the processing of each batch. >> However you have to set the batch interval such that batch interval > >> processing time of each batch. Otherwise, the system is not able to process >> as fast as batches of data are accumulating, so it is constantly getting >> backlogged. So try increasing the number of reducers as well as increasing >> the batch interval. >> >> Also you can monitor the batch processing times and end-to-end delay >> using the StreamingListener interface (see >> StreamingContext.addStreamingListener in Spark 0.9). if the batch interval >> is not large enough you will find that the the latency found with a >> streaming listener will keep growing. >> >> Hope this helps. >> >> TD >> >> >>> >>> >>> >>> >>> On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das < >>> [email protected]> wrote: >>> >>>> Can you tell me more about the structure of your program? As in >>>> 1) What storage levels are you using in the input stream? >>>> 2) How many reducers are using for the reduceByKeyAndWindow? >>>> 3) Batch interval and processing times seen with one machine vs two >>>> machines. >>>> >>>> A good place to start debugging is the Spark web ui for the Spark >>>> streaming application. It should running on the master at port 4040. There >>>> if you look at the stage you should see patterns of stages repeatedly. You >>>> can figure out the number of tasks in each stage, which stage is taking the >>>> most amount of time (and is therefore the bottleneck) etc. You can drill >>>> down and see where the tasks are running, is it using the 32 slots in the >>>> new machine or not. >>>> >>>> TD >>>> >>>> >>>> On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra < >>>> [email protected]> wrote: >>>> >>>>> Thanks TD. >>>>> >>>>> One more question: >>>>> >>>>> We are building real time analytics using spark streaming - We read >>>>> from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) >>>>> and then save to Cassandra (using DStream.foreachRDD). >>>>> Initially I used a machine with 32 cores, 32 GB and performed load >>>>> testing. with 1 master and 1 worker. in the same box. Later I added one >>>>> more box and launched worker on that box (32 core 16GB). I set >>>>> spark.executor.memory=10G in driver program >>>>> >>>>> I expected the performance should increase linearly as mentioned in >>>>> spark streaming video but it did not help. >>>>> >>>>> Can you please explain why it is so? Also how can we increase? >>>>> >>>>> Thanks, >>>>> Sourav >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das < >>>>> [email protected]> wrote: >>>>> >>>>>> Answers inline. Hope these answer your questions. >>>>>> >>>>>> TD >>>>>> >>>>>> >>>>>> On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> HI, >>>>>>> >>>>>>> I have couple of questions: >>>>>>> >>>>>>> 1. While going through the spark-streaming code, I found out there >>>>>>> is one configuration in JobScheduler/Generator >>>>>>> (spark.streaming.concurrentJobs) which is set to 1. There is no >>>>>>> documentation for this parameter. After setting this to 1000 in driver >>>>>>> program, our streaming application's performance is improved. >>>>>>> >>>>>> >>>>>> That is a parameter that allows Spark Stremaing to launch multiple >>>>>> Spark jobs simultaneously. While it can improve the performance in many >>>>>> scenarios (as it has in your case), it can actually increase the >>>>>> processing >>>>>> time of each batch and increase end-to-end latency in certain scenarios. >>>>>> So >>>>>> it is something that needs to be used with caution. That said, we should >>>>>> have definitely exposed it in the documentation. >>>>>> >>>>>> >>>>>>> What is this variable used for? Is it safe to use/tweak this >>>>>>> parameter? >>>>>>> >>>>>>> 2. Can someone explain the usage of MapOutputTracker, BlockManager >>>>>>> component. I have gone through the youtube video of Matei about spark >>>>>>> internals but this was not covered in detail. >>>>>>> >>>>>> >>>>>> I am not sure if there is a detailed document anywhere that explains >>>>>> but I can give you a high level overview of the both. >>>>>> >>>>>> BlockManager is like a distributed key-value store for large blobs >>>>>> (called blocks) of data. It has a master-worker architecture (loosely it >>>>>> is >>>>>> like the HDFS file system) where the BlockManager at the workers store >>>>>> the >>>>>> data blocks and BlockManagerMaster stores the metadata for what blocks >>>>>> are >>>>>> stored where. All the cached RDD's partitions and shuffle data are stored >>>>>> and managed by the BlockManager. It also transfers the blocks between the >>>>>> workers as needed (shuffles etc all happen through the block manager). >>>>>> Specifically for spark streaming, the data received from outside is >>>>>> stored >>>>>> in the BlockManager of the worker nodes, and the IDs of the blocks are >>>>>> reported to the BlockManagerMaster. >>>>>> >>>>>> MapOutputTrackers is a simpler component that keeps track of the >>>>>> location of the output of the map stage, so that workers running the >>>>>> reduce >>>>>> stage knows which machines to pull the data from. That also has the >>>>>> master-worker component - master has the full knowledge of the mapoutput >>>>>> and the worker component on-demand pulls that knowledge from the master >>>>>> component when the reduce tasks are executed on the worker. >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> 3. Can someone explain the usage of cache w.r.t spark streaming? For >>>>>>> example if we do stream.cache(), will the cache remain constant with all >>>>>>> the partitions of RDDs present across the nodes for that stream, OR >>>>>>> will it >>>>>>> be regularly updated as in while new batch is coming? >>>>>>> >>>>>>> If you call DStream.persist (persist == cache = true), then all RDDs >>>>>> generated by the DStream will be persisted in the cache (in the >>>>>> BlockManager). As new RDDs are generated and persisted, old RDDs from the >>>>>> same DStream will fall out of memory. either by LRU or explicitly if >>>>>> spark.streaming.unpersist is set to true. >>>>>> >>>>>> >>>>>>> Thanks, >>>>>>> -- >>>>>>> >>>>>>> Sourav Chandra >>>>>>> >>>>>>> Senior Software Engineer >>>>>>> >>>>>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >>>>>>> >>>>>>> [email protected] >>>>>>> >>>>>>> o: +91 80 4121 8723 >>>>>>> >>>>>>> m: +91 988 699 3746 >>>>>>> >>>>>>> skype: sourav.chandra >>>>>>> >>>>>>> Livestream >>>>>>> >>>>>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, >>>>>>> 3rd Block, Koramangala Industrial Area, >>>>>>> >>>>>>> Bangalore 560034 >>>>>>> >>>>>>> www.livestream.com >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Sourav Chandra >>>>> >>>>> Senior Software Engineer >>>>> >>>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >>>>> >>>>> [email protected] >>>>> >>>>> o: +91 80 4121 8723 >>>>> >>>>> m: +91 988 699 3746 >>>>> >>>>> skype: sourav.chandra >>>>> >>>>> Livestream >>>>> >>>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, >>>>> 3rd Block, Koramangala Industrial Area, >>>>> >>>>> Bangalore 560034 >>>>> >>>>> www.livestream.com >>>>> >>>> >>>> >>> >>> >>> -- >>> >>> Sourav Chandra >>> >>> Senior Software Engineer >>> >>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >>> >>> [email protected] >>> >>> o: +91 80 4121 8723 >>> >>> m: +91 988 699 3746 >>> >>> skype: sourav.chandra >>> >>> Livestream >>> >>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd >>> Block, Koramangala Industrial Area, >>> >>> Bangalore 560034 >>> >>> www.livestream.com >>> >> >> > > > -- > > Sourav Chandra > > Senior Software Engineer > > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · > > [email protected] > > o: +91 80 4121 8723 > > m: +91 988 699 3746 > > skype: sourav.chandra > > Livestream > > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd > Block, Koramangala Industrial Area, > > Bangalore 560034 > > www.livestream.com > -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · [email protected] o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
