GitHub user emres opened a pull request:
https://github.com/apache/spark/pull/5978
SPARK-7441 Implement microbatch functionality so that Spark Streaming can
process a large backlog of existing files discovered in batch in smaller batches
Implement microbatch functionality so that Spark Streaming can process a
huge backlog of existing files discovered in batch in smaller batches.
Spark Streaming can process already existing files in a directory, and
depending on the value of "spark.streaming.minRememberDuration" (60 seconds by
default, see [SPARK-3276](https://issues.apache.org/jira/browse/SPARK-3276) for
more details), this might mean that a Spark Streaming application can receive
thousands, or hundreds of thousands of files within the first batch interval.
This, in turn, leads to something like a 'flooding' effect for the streaming
application, that tries to deal with a huge number of existing files in a
single batch interval.
We will propose a very simple change to
`org.apache.spark.streaming.dstream.FileInputDStream`, so that, based on a
configuration property such as `spark.streaming.microbatch.size`, it will
either keep its default behavior when `spark.streaming.microbatch.size` will
have the default value of 0 (meaning as many as has been discovered as new
files in the current batch interval), or will process new files in groups of
`spark.streaming.microbatch.size` (e.g. in groups of 100s).
We have tested this patch in one of our customers, and it's been running
successfully for weeks (e.g. there were cases where our Spark Streaming
application was stopped, and in the meantime tens of thousands file were
created in a directory, and our Spark Streaming application had to process
those existing files after it was started).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/emres/spark SPARK-7441
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/5978.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5978
----
commit 0b6e1ccb0dfdaa3dbfa90287230173be7f09f20d
Author: emres <[email protected]>
Date: 2015-05-07T11:59:06Z
SPARK-7441 Added the micro-batch size functionality.
This helps when a Streaming application has started and there are already
existing files to be processed and there is a very big backlog of them, e.g.
tens of thousands, or hundreds of thousands. Normally this would lead to the
flooding of the Streaming application. This commit adds
spark.streaming.microbatch.size
configuration property. Its default value is 0, keeping the existing
behavior.
But if its value is a positive value, bigger than 0, e.g. 100, then instead
of trying
to process all of the files discovered during the batch interval, the system
will process them in groups (of 100 files per micro-batch for example), in
micro-batches.
commit d235eae5e57ebbe1eee2ce41b0a4b1aa6e8be474
Author: emres <[email protected]>
Date: 2015-05-07T12:05:48Z
SPARK-7441 Removed the accidentally added hs_err_pid15820.log
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]