Re: Spark Streaming - Design considerations/Knobs

2015-05-24 Thread Maiti, Samya
Really good list to brush up basics.

Just one input, regarding

  *   An RDD's processing is scheduled by driver's jobscheduler as a job. At a 
given point of time only one job is active. So, if one job is executing the 
other jobs are queued.

We can have multiple jobs running in a given application at a point of time, if 
they are submitted from different threads. So essentially in a single threaded 
application, the above statement holds true.

Regards,
Sam

On May 24, 2015, at 1:14 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:

Blocks are replicated immediately, before the driver launches any jobs using 
them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat 
hemant9...@gmail.commailto:hemant9...@gmail.com wrote:
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for 
reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the blocks 
replicated immediately after they are received by the receiver? Or are they 
kept on the receiver node only and are moved only on shuffle? Has the 
replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Correcting the ones that are incorrect or incomplete. BUT this is good list for 
things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat 
hemant9...@gmail.commailto:hemant9...@gmail.com wrote:
Hi,

I have compiled a list (from online sources) of knobs/design considerations 
that need to be taken care of by applications running on spark streaming. Is my 
understanding correct?  Any other important design consideration that I should 
take care of?


  *   A DStream is associated with a single receiver. For attaining read 
parallelism multiple receivers i.e. multiple DStreams need to be created.
  *   A receiver is run within an executor. It occupies one core. Ensure that 
there are enough cores for processing after receiver slots are booked i.e. 
spark.cores.max should take the receiver slots into account.
  *   The receivers are allocated to executors in a round robin fashion.
  *   When data is received from a stream source, receiver creates blocks of 
data.  A new block of data is generated every blockInterval milliseconds. N 
blocks of data are created during the batchInterval where N = 
batchInterval/blockInterval.
  *   These blocks are distributed by the BlockManager of the current executor 
to the block managers of other executors. After that, the Network Input Tracker 
running on the driver is informed about the block locations for further 
processing.
  *   A RDD is created on the driver for the blocks created during the 
batchInterval. The blocks generated during the batchInterval are partitions of 
the RDD. Each partition is a task in spark. blockInterval== batchinterval would 
mean that a single partition is created and probably it is processed locally.

The map tasks on the blocks are processed in the executors (one that received 
the block, and another where the block was replicated) that has the blocks 
irrespective of block interval, unless non-local scheduling kicks in (as you 
observed next).

  *   Having bigger blockinterval means bigger blocks. A high value of 
spark.locality.wait increases the chance of processing a block on the local 
node. A balance needs to be found out between these two parameters to ensure 
that the bigger blocks are processed locally.
  *   Instead of relying on batchInterval and blockInterval, you can define the 
number of partitions by calling dstream.repartition(n). This reshuffles the 
data in RDD randomly to create n number of partitions.

Yes, for greater parallelism. Though comes at the cost of a shuffle.

  *   An RDD's processing is scheduled by driver's jobscheduler as a job. At a 
given point of time only one job is active. So, if one job is executing the 
other jobs are queued.

  *   If you have two dstreams there will be two RDDs formed and there will be 
two jobs created which will be scheduled one after the another.

  *   To avoid this, you can union two dstreams. This will ensure that a single 
unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then 
considered as a single job. However the partitioning of the RDDs is not 
impacted.

To further clarify, the jobs depend on the number of output operations (print, 
foreachRDD, saveAsXFiles) and the number of RDD actions in those output 
operations.

dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job 
per batch

dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }
// TWO Spark jobs per batch

dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = 
rdd.count }  // TWO Spark jobs per batch



  *
  *   If the batch processing time is more than batchinterval then obviously 
the receiver's memory will start filling 

Re: Spark Streaming - Design considerations/Knobs

2015-05-24 Thread Tathagata Das
Blocks are replicated immediately, before the driver launches any jobs
using them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Honestly, given the length of my email, I didn't expect a reply. :-)
 Thanks for reading and replying. However, I have a follow-up question:

 I don't think if I understand the block replication completely. Are the
 blocks replicated immediately after they are received by the receiver? Or
 are they kept on the receiver node only and are moved only on shuffle? Has
 the replication something to do with locality.wait?

 Thanks,
 Hemant

 On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com
 wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are 
 booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates
blocks of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval 
 where N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the 
 Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a
job. At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one
 Spark job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count()
 } }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 

Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where 
 N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
 job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
 }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 processing time. Read -
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
 Furthemore, with checkpoint you can recover computation, but you 

Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Hemant Bhanawat
Hi,

I have compiled a list (from online sources) of knobs/design considerations
that need to be taken care of by applications running on spark streaming.
Is my understanding correct?  Any other important design consideration that
I should take care of?


   - A DStream is associated with a single receiver. For attaining read
   parallelism multiple receivers i.e. multiple DStreams need to be created.
   - A receiver is run within an executor. It occupies one core. Ensure
   that there are enough cores for processing after receiver slots are booked
   i.e. spark.cores.max should take the receiver slots into account.
   - The receivers are allocated to executors in a round robin fashion.
   - When data is received from a stream source, receiver creates blocks of
   data.  A new block of data is generated every blockInterval milliseconds. N
   blocks of data are created during the batchInterval where N =
   batchInterval/blockInterval.
   - These blocks are distributed by the BlockManager of the current
   executor to the block managers of other executors. After that, the Network
   Input Tracker running on the driver is informed about the block locations
   for further processing.
   - A RDD is created on the driver for the blocks created during the
   batchInterval. The blocks generated during the batchInterval are partitions
   of the RDD. Each partition is a task in spark. blockInterval==
   batchinterval would mean that a single partition is created and probably it
   is processed locally.
   - Having bigger blockinterval means bigger blocks. A high value of
   spark.locality.wait increases the chance of processing a block on the local
   node. A balance needs to be found out between these two parameters to
   ensure that the bigger blocks are processed locally.
   - Instead of relying on batchInterval and blockInterval, you can define
   the number of partitions by calling dstream.repartition(n). This reshuffles
   the data in RDD randomly to create n number of partitions.
   - An RDD's processing is scheduled by driver's jobscheduler as a job. At
   a given point of time only one job is active. So, if one job is executing
   the other jobs are queued.
   - If you have two dstreams there will be two RDDs formed and there will
   be two jobs created which will be scheduled one after the another.
   - To avoid this, you can union two dstreams. This will ensure that a
   single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
   is then considered as a single job. However the partitioning of the RDDs is
   not impacted.
   - If the batch processing time is more than batchinterval then obviously
   the receiver's memory will start filling up and will end up in throwing
   exceptions (most probably BlockNotFoundException). Currently there is  no
   way to pause the receiver.
   - For being fully fault tolerant, spark streaming needs to enable
   checkpointing. Checkpointing increases the batch processing time.
   - The frequency of metadata checkpoint cleaning can be controlled using
   spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when
   the RDDs in the checkpoint are no more required.



Thanks,
Hemant


Re: Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Tathagata Das
Correcting the ones that are incorrect or incomplete. BUT this is good list
for things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
received the block, and another where the block was replicated) that has
the blocks irrespective of block interval, unless non-local scheduling
kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs is
not impacted.

 To further clarify, the jobs depend on the number of output operations
(print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
output operations.

dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
job per batch

dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
}// TWO Spark jobs per batch

dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata. Only
data checkpointing, needed by only some operations, increase batch
processing time. Read -
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Furthemore, with checkpoint you can recover computation, but you may loose
some data (that was received but not processed before driver failed) for
some sources. Enabling write ahead logs and reliable source + receiver,
allow zero data loss. Read - WAL in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics


- The frequency of metadata checkpoint cleaning can be controlled
using spark.cleaner.ttl. But, data checkpoint cleaning happens
automatically when the RDDs in the checkpoint are no more required.


 Incorrect. metadata checkpointing or