FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This
reduces the number of temp files for the state store to half when you use
streaming aggregation.


On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim <>

> I can't spend too much time on explaining one by one. I strongly encourage
> you to do a deep-dive instead of just looking around as you want to know
> about "details" - that's how open source works.
> I'll go through a general explanation instead of replying inline; probably
> I'd write a blog doc if there's no existing doc (I guess there should be
> one) instead of putting too much time here.
> In short, the reason Spark has to create these files "per micro-batch" is
> to ensure fault-tolerance. For example, If the query fails at batch 5 and
> you rerun the query, it should rerun batch 5. How?
> Spark should be aware the offsets the query has been read for batch 4,
> preferably the offsets the query read for batch 5. They're offsets/commits.
> State is for storing accumulated values on stateful operations. Same here
> - Spark should be able to read the state for batch 4 so that it can
> calculate the new accumulated values for batch 5. In addition, partition
> means max parallelism (they aren't aware of each other and they shouldn't),
> hence the state for partition should be stored individually.
> Storing 4 files (in the end we'll only have "2" files, but here I count
> temp files with crc files, as we are talking about performance aspect) per
> partition per micro-batch is the thing I already explained - I agree it's
> not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
> number of files by half. Probably we could propose Hadoop to skip creating
> CRC files (I'm not sure it can be simply done as of now), but Spark
> is conservative about upgrading the versions for dependencies so it might
> not be available soon even if we address it right away.
> As you've found here it's super important to find the right value of
> shuffle partitions. It's partitioned by hash function, so it strongly
> depends on the group key. If the cardinality of group key is low, probably
> the right value of shuffle partitions should be fairly small. Unfortunately
> once the query runs you can't change the value of shuffle partitions, as
> Spark doesn't have the feature of state migration once the number of
> partitions change. Either you need to predict the overall cardinality at
> specific time and set the right value, or try to use a 3rd party state
> tool. [2] (DISCLAIMER: I'm the author.)
> 1.
> 2.
> On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev <> wrote:
>> Hi Jungtaek,
>> *> I meant the subdirectory inside the directory you're providing as
>> "checkpointLocation", as there're several directories in that directory...*
>> There are two:
>> *my-spark-checkpoint-dir/MainApp*
>> created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir
>> for the app>)
>> contains only empty subdir with GUID name
>> *my-spark-checkpoint-dir/writer*
>> created by ds.writeStream().option("checkpointLocation", <checkpoint dir
>> for writer>)
>> contains all the files
>> Within the latter ("writer") there are four subdirectories: commits,
>> metadata, offsets, state.
>> Breakdown of file creations within them, per 69 microbatches (when
>> shuffle partition count = 200) is:
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 56232
>> (Creation is identified by strace record for "openat" system call with
>> O_CREAT flag and file path in the corresponding directory.)
>> When shuffle partition count is 10, breakdown of file creations within
>> them, per 69 microbatches, is:
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 2760
>> *> The size of the delta file heavily depends on your stateful operation
>> and data in each micro-batch. delta file only captures the "changes" of
>> state in specific micro-batch, so there're cases you'll have very tiny
>> delta files, e.g. cardinality of grouped key is small (hence cardinality of
>> KVs is also small), small amount of inputs are provided per micro-batch,
>> the overall size of aggregated row is small, there's skew on grouped key
>> (hence some partitions get no input or small inputs), etc.*
>> In my case there is no key in the Row object (unless the bucketized
>> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
>> large enough: the whole problem is that Spark does not want to save the
>> microbatch as a single file. Even after I reduce the number of shuffle
>> partitions (see below), the number of files per microbatch remains
>> significantly larger than the number of shuffle partitions.
>> ..........
>> When the number of shuffle partitions is 200, Spark creates 816 files
>> (per microbatch) in checkpoint store and 202 files in Spark local-dir.
>> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
>> delta files.
>> The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
>> Of local-dir files: 200 temp_shuffle files per microbatch (as expected)
>> and 2 other files (
>> If I reduce the number of shuffle partitions, two things happen:
>> - Throughput of a single pipeline improves.
>> - CPU usage by the pipeline is reduced (allowing a single node to co-run
>> larger number of pipelines).
>> Most of the improvements are gained by the time the number of partitions
>> is reduced to 5-10.
>> Going below that, further improvements are marginal.
>> When reducing the number of shuffle partitions from 200 to 10, physical
>> latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU
>> usage is reduced 2.6 times.
>> When reducing the number of shuffle partitions from 200 to 5, physical
>> latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU
>> usage is reduced 4.5 times.
>> Still, latency remains high, because the number of created files per
>> microbatch remains high.
>> ..........
>> With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in
>> checkpoint store and 6.9 files in Spark local-dir.
>> Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are
>> delta files.
>> Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected)
>> and 2 other files (
>> Why would Spark need to create 20 delta files per microbatch, or to put
>> it another way: 4 delta files per microbatch per shuffle partition?
>> One could try to guess this could be due to changing "timestamp", but
>> this does not bear out. In my produced stream (69 microbatches) there are
>> only 46 distinct values for timestamp, consecutively increasing from first
>> timestamp to last. Thus lots of microbatches will have just one timestamp
>> value. On average, microbatch will have 1.5 distinct timestamp values. But
>> it would, of course, be terribly wrong for Spark to use raw timestamp value
>> as a key, as in real world almost every event would have a unique
>> timestamp, so the number of files required for saving by timestamp as a key
>> would be insane, hopefully Spark does not attempt to do that. But perhaps
>> it may use the index of 2-minute window bucket as a key. If so, there are
>> only 6 distinct values per the whole event set (I have window size set to 2
>> minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90%
>> of microbatches will fall wholly in just one window bucket, and 10% in two
>> buckets. So why 4 delta files per microbatch per shuffle partition?
>> ..........
>> For the completeness of the picture, if I run the test with the shuffle
>> partition count set to 1, then:
>> Spark creates 8 files (per microbatch) in the checkpoint store and 3
>> files in Spark local-dir.
>> Of checkpoint files: 0.03 per microbatch are snapshot files (only 2
>> snapshot files in the whole run), and 4 delta files per microbatch.
>> Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and
>> 2 other files (
>> ..........
>> Thus Spark SS seems to keep 4 delta files per microbatch per shuffle
>> partition, no matter what is the number of shuffle partitions.
>> Why would it have to do this?
>> Also unsure why Spark has to create local-dir files per every microbatch,
>> rather than keeping them open across microbatches and re-using from one
>> microbatch to another (writing data over, but without having to go through
>> file creation)...
>> *> Spark leverages HDFS API which is configured to create crc file per
>> file by default. *
>> This is unfortunate. Much better would be to create checkpoint files with
>> HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right
>> inside the main file itself, rather than as a separate file.
>> * * *
>> While we are at it, I wished to ask a related question. Suppose I create
>> multiple parallel streaming pipelines in the applications, by pipeline
>> meaning the whole data stream from initial Dataset/DatasetReader to the
>> output of Spark query. Suppose I have multiple parallel pipelines in the
>> application, a large number, let us say dozens or hundreds.
>> How would Spark process them in terms of threading model? Will there be a
>> separate thread per active stream/query or does Spark use a bounded thread
>> pool? Do many streams/queries result in many threads, or a limited number
>> of threads?
>> Thanks,
>> Sergey

Reply via email to