FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This
reduces the number of temp files for the state store to half when you use
streaming aggregation.

1. https://issues.apache.org/jira/browse/SPARK-30294

On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> I can't spend too much time on explaining one by one. I strongly encourage
> you to do a deep-dive instead of just looking around as you want to know
> about "details" - that's how open source works.
>
> I'll go through a general explanation instead of replying inline; probably
> I'd write a blog doc if there's no existing doc (I guess there should be
> one) instead of putting too much time here.
>
> In short, the reason Spark has to create these files "per micro-batch" is
> to ensure fault-tolerance. For example, If the query fails at batch 5 and
> you rerun the query, it should rerun batch 5. How?
>
> Spark should be aware the offsets the query has been read for batch 4,
> preferably the offsets the query read for batch 5. They're offsets/commits.
> State is for storing accumulated values on stateful operations. Same here
> - Spark should be able to read the state for batch 4 so that it can
> calculate the new accumulated values for batch 5. In addition, partition
> means max parallelism (they aren't aware of each other and they shouldn't),
> hence the state for partition should be stored individually.
>
> Storing 4 files (in the end we'll only have "2" files, but here I count
> temp files with crc files, as we are talking about performance aspect) per
> partition per micro-batch is the thing I already explained - I agree it's
> not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
> number of files by half. Probably we could propose Hadoop to skip creating
> CRC files (I'm not sure it can be simply done as of now), but Spark
> is conservative about upgrading the versions for dependencies so it might
> not be available soon even if we address it right away.
>
> As you've found here it's super important to find the right value of
> shuffle partitions. It's partitioned by hash function, so it strongly
> depends on the group key. If the cardinality of group key is low, probably
> the right value of shuffle partitions should be fairly small. Unfortunately
> once the query runs you can't change the value of shuffle partitions, as
> Spark doesn't have the feature of state migration once the number of
> partitions change. Either you need to predict the overall cardinality at
> specific time and set the right value, or try to use a 3rd party state
> tool. [2] (DISCLAIMER: I'm the author.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30294
> 2. https://github.com/HeartSaVioR/spark-state-tools
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev <obog...@gmail.com> wrote:
>
>> Hi Jungtaek,
>>
>> *> I meant the subdirectory inside the directory you're providing as
>> "checkpointLocation", as there're several directories in that directory...*
>>
>> There are two:
>>
>> *my-spark-checkpoint-dir/MainApp*
>> created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir
>> for the app>)
>> contains only empty subdir with GUID name
>>
>> *my-spark-checkpoint-dir/writer*
>> created by ds.writeStream().option("checkpointLocation", <checkpoint dir
>> for writer>)
>> contains all the files
>>
>> Within the latter ("writer") there are four subdirectories: commits,
>> metadata, offsets, state.
>>
>> Breakdown of file creations within them, per 69 microbatches (when
>> shuffle partition count = 200) is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 56232
>>
>> (Creation is identified by strace record for "openat" system call with
>> O_CREAT flag and file path in the corresponding directory.)
>>
>> When shuffle partition count is 10, breakdown of file creations within
>> them, per 69 microbatches, is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 2760
>>
>> *> The size of the delta file heavily depends on your stateful operation
>> and data in each micro-batch. delta file only captures the "changes" of
>> state in specific micro-batch, so there're cases you'll have very tiny
>> delta files, e.g. cardinality of grouped key is small (hence cardinality of
>> KVs is also small), small amount of inputs are provided per micro-batch,
>> the overall size of aggregated row is small, there's skew on grouped key
>> (hence some partitions get no input or small inputs), etc.*
>>
>>
>> In my case there is no key in the Row object (unless the bucketized
>> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
>> large enough: the whole problem is that Spark does not want to save the
>> microbatch as a single file. Even after I reduce the number of shuffle
>> partitions (see below), the number of files per microbatch remains
>> significantly larger than the number of shuffle partitions.
>>
>> ..........
>>
>> When the number of shuffle partitions is 200, Spark creates 816 files
>> (per microbatch) in checkpoint store and 202 files in Spark local-dir.
>>
>> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
>> delta files.
>> The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
>> Of local-dir files: 200 temp_shuffle files per microbatch (as expected)
>> and 2 other files (shuffle.data+shuffle.index).
>>
>> If I reduce the number of shuffle partitions, two things happen:
>> - Throughput of a single pipeline improves.
>> - CPU usage by the pipeline is reduced (allowing a single node to co-run
>> larger number of pipelines).
>> Most of the improvements are gained by the time the number of partitions
>> is reduced to 5-10.
>> Going below that, further improvements are marginal.
>>
>> When reducing the number of shuffle partitions from 200 to 10, physical
>> latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU
>> usage is reduced 2.6 times.
>>
>> When reducing the number of shuffle partitions from 200 to 5, physical
>> latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU
>> usage is reduced 4.5 times.
>>
>> Still, latency remains high, because the number of created files per
>> microbatch remains high.
>>
>> ..........
>>
>> With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in
>> checkpoint store and 6.9 files in Spark local-dir.
>> Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are
>> delta files.
>> Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected)
>> and 2 other files (shuffle.data+shuffle.index).
>>
>> Why would Spark need to create 20 delta files per microbatch, or to put
>> it another way: 4 delta files per microbatch per shuffle partition?
>>
>> One could try to guess this could be due to changing "timestamp", but
>> this does not bear out. In my produced stream (69 microbatches) there are
>> only 46 distinct values for timestamp, consecutively increasing from first
>> timestamp to last. Thus lots of microbatches will have just one timestamp
>> value. On average, microbatch will have 1.5 distinct timestamp values. But
>> it would, of course, be terribly wrong for Spark to use raw timestamp value
>> as a key, as in real world almost every event would have a unique
>> timestamp, so the number of files required for saving by timestamp as a key
>> would be insane, hopefully Spark does not attempt to do that. But perhaps
>> it may use the index of 2-minute window bucket as a key. If so, there are
>> only 6 distinct values per the whole event set (I have window size set to 2
>> minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90%
>> of microbatches will fall wholly in just one window bucket, and 10% in two
>> buckets. So why 4 delta files per microbatch per shuffle partition?
>>
>> ..........
>>
>> For the completeness of the picture, if I run the test with the shuffle
>> partition count set to 1, then:
>> Spark creates 8 files (per microbatch) in the checkpoint store and 3
>> files in Spark local-dir.
>> Of checkpoint files: 0.03 per microbatch are snapshot files (only 2
>> snapshot files in the whole run), and 4 delta files per microbatch.
>> Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and
>> 2 other files (shuffle.data+shuffle.index).
>>
>> ..........
>>
>> Thus Spark SS seems to keep 4 delta files per microbatch per shuffle
>> partition, no matter what is the number of shuffle partitions.
>> Why would it have to do this?
>>
>> Also unsure why Spark has to create local-dir files per every microbatch,
>> rather than keeping them open across microbatches and re-using from one
>> microbatch to another (writing data over, but without having to go through
>> file creation)...
>>
>> *> Spark leverages HDFS API which is configured to create crc file per
>> file by default. *
>>
>>
>> This is unfortunate. Much better would be to create checkpoint files with
>> HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right
>> inside the main file itself, rather than as a separate file.
>>
>> * * *
>>
>> While we are at it, I wished to ask a related question. Suppose I create
>> multiple parallel streaming pipelines in the applications, by pipeline
>> meaning the whole data stream from initial Dataset/DatasetReader to the
>> output of Spark query. Suppose I have multiple parallel pipelines in the
>> application, a large number, let us say dozens or hundreds.
>>
>> How would Spark process them in terms of threading model? Will there be a
>> separate thread per active stream/query or does Spark use a bounded thread
>> pool? Do many streams/queries result in many threads, or a limited number
>> of threads?
>>
>> Thanks,
>> Sergey
>>
>

Reply via email to