Blocks are replicated immediately, before the driver launches any jobs using them.
On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat <hemant9...@gmail.com> wrote: > Honestly, given the length of my email, I didn't expect a reply. :-) > Thanks for reading and replying. However, I have a follow-up question: > > I don't think if I understand the block replication completely. Are the > blocks replicated immediately after they are received by the receiver? Or > are they kept on the receiver node only and are moved only on shuffle? Has > the replication something to do with locality.wait? > > Thanks, > Hemant > > On Thu, May 21, 2015 at 2:21 AM, Tathagata Das <t...@databricks.com> > wrote: > >> Correcting the ones that are incorrect or incomplete. BUT this is good >> list for things to remember about Spark Streaming. >> >> >> On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat <hemant9...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I have compiled a list (from online sources) of knobs/design >>> considerations that need to be taken care of by applications running on >>> spark streaming. Is my understanding correct? Any other important design >>> consideration that I should take care of? >>> >>> >>> - A DStream is associated with a single receiver. For attaining read >>> parallelism multiple receivers i.e. multiple DStreams need to be created. >>> - A receiver is run within an executor. It occupies one core. Ensure >>> that there are enough cores for processing after receiver slots are >>> booked >>> i.e. spark.cores.max should take the receiver slots into account. >>> - The receivers are allocated to executors in a round robin fashion. >>> - When data is received from a stream source, receiver creates >>> blocks of data. A new block of data is generated every blockInterval >>> milliseconds. N blocks of data are created during the batchInterval >>> where N >>> = batchInterval/blockInterval. >>> - These blocks are distributed by the BlockManager of the current >>> executor to the block managers of other executors. After that, the >>> Network >>> Input Tracker running on the driver is informed about the block locations >>> for further processing. >>> - A RDD is created on the driver for the blocks created during the >>> batchInterval. The blocks generated during the batchInterval are >>> partitions >>> of the RDD. Each partition is a task in spark. blockInterval== >>> batchinterval would mean that a single partition is created and probably >>> it >>> is processed locally. >>> >>> The map tasks on the blocks are processed in the executors (one that >> received the block, and another where the block was replicated) that has >> the blocks irrespective of block interval, unless non-local scheduling >> kicks in (as you observed next). >> >>> >>> - Having bigger blockinterval means bigger blocks. A high value of >>> spark.locality.wait increases the chance of processing a block on the >>> local >>> node. A balance needs to be found out between these two parameters to >>> ensure that the bigger blocks are processed locally. >>> - Instead of relying on batchInterval and blockInterval, you can >>> define the number of partitions by calling dstream.repartition(n). This >>> reshuffles the data in RDD randomly to create n number of partitions. >>> >>> Yes, for greater parallelism. Though comes at the cost of a shuffle. >> >>> >>> - An RDD's processing is scheduled by driver's jobscheduler as a >>> job. At a given point of time only one job is active. So, if one job is >>> executing the other jobs are queued. >>> >>> >>> - If you have two dstreams there will be two RDDs formed and there >>> will be two jobs created which will be scheduled one after the another. >>> >>> >>> - To avoid this, you can union two dstreams. This will ensure that a >>> single unionRDD is formed for the two RDDs of the dstreams. This unionRDD >>> is then considered as a single job. However the partitioning of the RDDs >>> is >>> not impacted. >>> >>> To further clarify, the jobs depend on the number of output operations >> (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those >> output operations. >> >> dstream1.union(dstream2).foreachRDD { rdd => rdd.count() } // one >> Spark job per batch >> >> dstream1.union(dstream2).foreachRDD { rdd => { rdd.count() ; rdd.count() >> } } // TWO Spark jobs per batch >> >> dstream1.foreachRDD { rdd => rdd.count } ; dstream2.foreachRDD { rdd => >> rdd.count } // TWO Spark jobs per batch >> >>> >>> >>> >> >>> >>> - >>> - If the batch processing time is more than batchinterval then >>> obviously the receiver's memory will start filling up and will end up in >>> throwing exceptions (most probably BlockNotFoundException). Currently >>> there >>> is no way to pause the receiver. >>> >>> You can limit the rate of receiver using SparkConf config >> spark.streaming.receiver.maxRate >> >>> >>> - >>> - For being fully fault tolerant, spark streaming needs to enable >>> checkpointing. Checkpointing increases the batch processing time. >>> >>> Incomplete. There are two types of checkpointing - data and metadata. >> Only data checkpointing, needed by only some operations, increase batch >> processing time. Read - >> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing >> Furthemore, with checkpoint you can recover computation, but you may >> loose some data (that was received but not processed before driver failed) >> for some sources. Enabling write ahead logs and reliable source + receiver, >> allow zero data loss. Read - WAL in >> http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics >> >>> >>> - The frequency of metadata checkpoint cleaning can be controlled >>> using spark.cleaner.ttl. But, data checkpoint cleaning happens >>> automatically when the RDDs in the checkpoint are no more required. >>> >>> >>> Incorrect. metadata checkpointing or (DStream checkpointing) is self >> cleaning. What are you are probably talking about is cleaning of shuffle >> and other data in the executors. That can be cleaned using >> spark.cleaner.ttl, but it is a brute force hammer and can clean more stuff >> than you intend. Its not recommended to use that. Rather Spark has >> GC-triggered cleaning of all that, when RDD objects are GCed, their shuffle >> data, cached data, etc are also cleaned in the executors. You can trigger >> GC based cleaning by called System.gc() in the driver periodically. >> >> >>> >>> Thanks, >>> Hemant >>> >> >> >