Re: Spark Streaming - Design considerations/Knobs
Really good list to brush up basics. Just one input, regarding * An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. We can have multiple jobs running in a given application at a point of time, if they are submitted from different threads. So essentially in a single threaded application, the above statement holds true. Regards, Sam On May 24, 2015, at 1:14 PM, Tathagata Das mailto:t...@databricks.com>> wrote: Blocks are replicated immediately, before the driver launches any jobs using them. On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat mailto:hemant9...@gmail.com>> wrote: Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das mailto:t...@databricks.com>> wrote: Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat mailto:hemant9...@gmail.com>> wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? * A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. * A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. * The receivers are allocated to executors in a round robin fashion. * When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. * These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. * A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). * Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. * Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. * An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. * If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. * To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() } } // TWO Spark jobs per batch dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd => rdd.count } // TWO Spark jobs per batch * * If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNot
Re: Spark Streaming - Design considerations/Knobs
Blocks are replicated immediately, before the driver launches any jobs using them. On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat wrote: > Honestly, given the length of my email, I didn't expect a reply. :-) > Thanks for reading and replying. However, I have a follow-up question: > > I don't think if I understand the block replication completely. Are the > blocks replicated immediately after they are received by the receiver? Or > are they kept on the receiver node only and are moved only on shuffle? Has > the replication something to do with locality.wait? > > Thanks, > Hemant > > On Thu, May 21, 2015 at 2:21 AM, Tathagata Das > wrote: > >> Correcting the ones that are incorrect or incomplete. BUT this is good >> list for things to remember about Spark Streaming. >> >> >> On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat >> wrote: >> >>> Hi, >>> >>> I have compiled a list (from online sources) of knobs/design >>> considerations that need to be taken care of by applications running on >>> spark streaming. Is my understanding correct? Any other important design >>> consideration that I should take care of? >>> >>> >>>- A DStream is associated with a single receiver. For attaining read >>>parallelism multiple receivers i.e. multiple DStreams need to be created. >>>- A receiver is run within an executor. It occupies one core. Ensure >>>that there are enough cores for processing after receiver slots are >>> booked >>>i.e. spark.cores.max should take the receiver slots into account. >>>- The receivers are allocated to executors in a round robin fashion. >>>- When data is received from a stream source, receiver creates >>>blocks of data. A new block of data is generated every blockInterval >>>milliseconds. N blocks of data are created during the batchInterval >>> where N >>>= batchInterval/blockInterval. >>>- These blocks are distributed by the BlockManager of the current >>>executor to the block managers of other executors. After that, the >>> Network >>>Input Tracker running on the driver is informed about the block locations >>>for further processing. >>>- A RDD is created on the driver for the blocks created during the >>>batchInterval. The blocks generated during the batchInterval are >>> partitions >>>of the RDD. Each partition is a task in spark. blockInterval== >>>batchinterval would mean that a single partition is created and probably >>> it >>>is processed locally. >>> >>> The map tasks on the blocks are processed in the executors (one that >> received the block, and another where the block was replicated) that has >> the blocks irrespective of block interval, unless non-local scheduling >> kicks in (as you observed next). >> >>> >>>- Having bigger blockinterval means bigger blocks. A high value of >>>spark.locality.wait increases the chance of processing a block on the >>> local >>>node. A balance needs to be found out between these two parameters to >>>ensure that the bigger blocks are processed locally. >>>- Instead of relying on batchInterval and blockInterval, you can >>>define the number of partitions by calling dstream.repartition(n). This >>>reshuffles the data in RDD randomly to create n number of partitions. >>> >>> Yes, for greater parallelism. Though comes at the cost of a shuffle. >> >>> >>>- An RDD's processing is scheduled by driver's jobscheduler as a >>>job. At a given point of time only one job is active. So, if one job is >>>executing the other jobs are queued. >>> >>> >>>- If you have two dstreams there will be two RDDs formed and there >>>will be two jobs created which will be scheduled one after the another. >>> >>> >>>- To avoid this, you can union two dstreams. This will ensure that a >>>single unionRDD is formed for the two RDDs of the dstreams. This unionRDD >>>is then considered as a single job. However the partitioning of the RDDs >>> is >>>not impacted. >>> >>> To further clarify, the jobs depend on the number of output operations >> (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those >> output operations. >> >> dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }// one >> Spark job per batch >> >> dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() >> } }// TWO Spark jobs per batch >> >> dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd => >> rdd.count } // TWO Spark jobs per batch >> >>> >>> >>> >> >>> >>>- >>>- If the batch processing time is more than batchinterval then >>>obviously the receiver's memory will start filling up and will end up in >>>throwing exceptions (most probably BlockNotFoundException). Currently >>> there >>>is no way to pause the receiver. >>> >>> You can limit the rate of receiver using SparkConf config >> spark.streaming.receiver.maxRate >> >>> >>>- >>>- For being fully fault tolerant, spar
Re: Spark Streaming - Design considerations/Knobs
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das wrote: > Correcting the ones that are incorrect or incomplete. BUT this is good > list for things to remember about Spark Streaming. > > > On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat > wrote: > >> Hi, >> >> I have compiled a list (from online sources) of knobs/design >> considerations that need to be taken care of by applications running on >> spark streaming. Is my understanding correct? Any other important design >> consideration that I should take care of? >> >> >>- A DStream is associated with a single receiver. For attaining read >>parallelism multiple receivers i.e. multiple DStreams need to be created. >>- A receiver is run within an executor. It occupies one core. Ensure >>that there are enough cores for processing after receiver slots are booked >>i.e. spark.cores.max should take the receiver slots into account. >>- The receivers are allocated to executors in a round robin fashion. >>- When data is received from a stream source, receiver creates blocks >>of data. A new block of data is generated every blockInterval >>milliseconds. N blocks of data are created during the batchInterval where >> N >>= batchInterval/blockInterval. >>- These blocks are distributed by the BlockManager of the current >>executor to the block managers of other executors. After that, the Network >>Input Tracker running on the driver is informed about the block locations >>for further processing. >>- A RDD is created on the driver for the blocks created during the >>batchInterval. The blocks generated during the batchInterval are >> partitions >>of the RDD. Each partition is a task in spark. blockInterval== >>batchinterval would mean that a single partition is created and probably >> it >>is processed locally. >> >> The map tasks on the blocks are processed in the executors (one that > received the block, and another where the block was replicated) that has > the blocks irrespective of block interval, unless non-local scheduling > kicks in (as you observed next). > >> >>- Having bigger blockinterval means bigger blocks. A high value of >>spark.locality.wait increases the chance of processing a block on the >> local >>node. A balance needs to be found out between these two parameters to >>ensure that the bigger blocks are processed locally. >>- Instead of relying on batchInterval and blockInterval, you can >>define the number of partitions by calling dstream.repartition(n). This >>reshuffles the data in RDD randomly to create n number of partitions. >> >> Yes, for greater parallelism. Though comes at the cost of a shuffle. > >> >>- An RDD's processing is scheduled by driver's jobscheduler as a job. >>At a given point of time only one job is active. So, if one job is >>executing the other jobs are queued. >> >> >>- If you have two dstreams there will be two RDDs formed and there >>will be two jobs created which will be scheduled one after the another. >> >> >>- To avoid this, you can union two dstreams. This will ensure that a >>single unionRDD is formed for the two RDDs of the dstreams. This unionRDD >>is then considered as a single job. However the partitioning of the RDDs >> is >>not impacted. >> >> To further clarify, the jobs depend on the number of output operations > (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those > output operations. > > dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }// one Spark > job per batch > > dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() } > }// TWO Spark jobs per batch > > dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd => > rdd.count } // TWO Spark jobs per batch > >> >> >> > >> >>- >>- If the batch processing time is more than batchinterval then >>obviously the receiver's memory will start filling up and will end up in >>throwing exceptions (most probably BlockNotFoundException). Currently >> there >>is no way to pause the receiver. >> >> You can limit the rate of receiver using SparkConf config > spark.streaming.receiver.maxRate > >> >>- >>- For being fully fault tolerant, spark streaming needs to enable >>checkpointing. Checkpointing increases the batch processing time. >> >> Incomplete. There are two types of checkpointing - data and metadata. > Only data checkpointing, needed by only some operations, increase batch > processing time. Read - >
Re: Spark Streaming - Design considerations/Knobs
Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat wrote: > Hi, > > I have compiled a list (from online sources) of knobs/design > considerations that need to be taken care of by applications running on > spark streaming. Is my understanding correct? Any other important design > consideration that I should take care of? > > >- A DStream is associated with a single receiver. For attaining read >parallelism multiple receivers i.e. multiple DStreams need to be created. >- A receiver is run within an executor. It occupies one core. Ensure >that there are enough cores for processing after receiver slots are booked >i.e. spark.cores.max should take the receiver slots into account. >- The receivers are allocated to executors in a round robin fashion. >- When data is received from a stream source, receiver creates blocks >of data. A new block of data is generated every blockInterval >milliseconds. N blocks of data are created during the batchInterval where N >= batchInterval/blockInterval. >- These blocks are distributed by the BlockManager of the current >executor to the block managers of other executors. After that, the Network >Input Tracker running on the driver is informed about the block locations >for further processing. >- A RDD is created on the driver for the blocks created during the >batchInterval. The blocks generated during the batchInterval are partitions >of the RDD. Each partition is a task in spark. blockInterval== >batchinterval would mean that a single partition is created and probably it >is processed locally. > > The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). > >- Having bigger blockinterval means bigger blocks. A high value of >spark.locality.wait increases the chance of processing a block on the local >node. A balance needs to be found out between these two parameters to >ensure that the bigger blocks are processed locally. >- Instead of relying on batchInterval and blockInterval, you can >define the number of partitions by calling dstream.repartition(n). This >reshuffles the data in RDD randomly to create n number of partitions. > > Yes, for greater parallelism. Though comes at the cost of a shuffle. > >- An RDD's processing is scheduled by driver's jobscheduler as a job. >At a given point of time only one job is active. So, if one job is >executing the other jobs are queued. > > >- If you have two dstreams there will be two RDDs formed and there >will be two jobs created which will be scheduled one after the another. > > >- To avoid this, you can union two dstreams. This will ensure that a >single unionRDD is formed for the two RDDs of the dstreams. This unionRDD >is then considered as a single job. However the partitioning of the RDDs is >not impacted. > > To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd => rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd => rdd.count } // TWO Spark jobs per batch > > > > >- >- If the batch processing time is more than batchinterval then >obviously the receiver's memory will start filling up and will end up in >throwing exceptions (most probably BlockNotFoundException). Currently there >is no way to pause the receiver. > > You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate > >- >- For being fully fault tolerant, spark streaming needs to enable >checkpointing. Checkpointing increases the batch processing time. > > Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch processing time. Read - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Furthemore, with checkpoint you can recover computation, but you may loose some data (that was received but not processed before driver failed) for some sources. Enabling write ahead logs and reliable source + receiver, allow zero data loss. Read - WAL in http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics > >- The frequency of metadata checkpoint cleaning can be controlled >using spark.cleaner.ttl. But, data checkpoint cleaning happens >automatically when the RDDs in the checkpoint are n