FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This reduces the number of temp files for the state store to half when you use streaming aggregation.
1. https://issues.apache.org/jira/browse/SPARK-30294 On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > I can't spend too much time on explaining one by one. I strongly encourage > you to do a deep-dive instead of just looking around as you want to know > about "details" - that's how open source works. > > I'll go through a general explanation instead of replying inline; probably > I'd write a blog doc if there's no existing doc (I guess there should be > one) instead of putting too much time here. > > In short, the reason Spark has to create these files "per micro-batch" is > to ensure fault-tolerance. For example, If the query fails at batch 5 and > you rerun the query, it should rerun batch 5. How? > > Spark should be aware the offsets the query has been read for batch 4, > preferably the offsets the query read for batch 5. They're offsets/commits. > State is for storing accumulated values on stateful operations. Same here > - Spark should be able to read the state for batch 4 so that it can > calculate the new accumulated values for batch 5. In addition, partition > means max parallelism (they aren't aware of each other and they shouldn't), > hence the state for partition should be stored individually. > > Storing 4 files (in the end we'll only have "2" files, but here I count > temp files with crc files, as we are talking about performance aspect) per > partition per micro-batch is the thing I already explained - I agree it's > not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the > number of files by half. Probably we could propose Hadoop to skip creating > CRC files (I'm not sure it can be simply done as of now), but Spark > is conservative about upgrading the versions for dependencies so it might > not be available soon even if we address it right away. > > As you've found here it's super important to find the right value of > shuffle partitions. It's partitioned by hash function, so it strongly > depends on the group key. If the cardinality of group key is low, probably > the right value of shuffle partitions should be fairly small. Unfortunately > once the query runs you can't change the value of shuffle partitions, as > Spark doesn't have the feature of state migration once the number of > partitions change. Either you need to predict the overall cardinality at > specific time and set the right value, or try to use a 3rd party state > tool. [2] (DISCLAIMER: I'm the author.) > > 1. https://issues.apache.org/jira/browse/SPARK-30294 > 2. https://github.com/HeartSaVioR/spark-state-tools > > > On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev <obog...@gmail.com> wrote: > >> Hi Jungtaek, >> >> *> I meant the subdirectory inside the directory you're providing as >> "checkpointLocation", as there're several directories in that directory...* >> >> There are two: >> >> *my-spark-checkpoint-dir/MainApp* >> created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir >> for the app>) >> contains only empty subdir with GUID name >> >> *my-spark-checkpoint-dir/writer* >> created by ds.writeStream().option("checkpointLocation", <checkpoint dir >> for writer>) >> contains all the files >> >> Within the latter ("writer") there are four subdirectories: commits, >> metadata, offsets, state. >> >> Breakdown of file creations within them, per 69 microbatches (when >> shuffle partition count = 200) is: >> >> commits = 136 >> metadata = 0 >> offsets = 138 >> state = 56232 >> >> (Creation is identified by strace record for "openat" system call with >> O_CREAT flag and file path in the corresponding directory.) >> >> When shuffle partition count is 10, breakdown of file creations within >> them, per 69 microbatches, is: >> >> commits = 136 >> metadata = 0 >> offsets = 138 >> state = 2760 >> >> *> The size of the delta file heavily depends on your stateful operation >> and data in each micro-batch. delta file only captures the "changes" of >> state in specific micro-batch, so there're cases you'll have very tiny >> delta files, e.g. cardinality of grouped key is small (hence cardinality of >> KVs is also small), small amount of inputs are provided per micro-batch, >> the overall size of aggregated row is small, there's skew on grouped key >> (hence some partitions get no input or small inputs), etc.* >> >> >> In my case there is no key in the Row object (unless the bucketized >> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is >> large enough: the whole problem is that Spark does not want to save the >> microbatch as a single file. Even after I reduce the number of shuffle >> partitions (see below), the number of files per microbatch remains >> significantly larger than the number of shuffle partitions. >> >> .......... >> >> When the number of shuffle partitions is 200, Spark creates 816 files >> (per microbatch) in checkpoint store and 202 files in Spark local-dir. >> >> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are >> delta files. >> The same per microbatch per partition: 0.12 snapshot files, 4 delta files. >> Of local-dir files: 200 temp_shuffle files per microbatch (as expected) >> and 2 other files (shuffle.data+shuffle.index). >> >> If I reduce the number of shuffle partitions, two things happen: >> - Throughput of a single pipeline improves. >> - CPU usage by the pipeline is reduced (allowing a single node to co-run >> larger number of pipelines). >> Most of the improvements are gained by the time the number of partitions >> is reduced to 5-10. >> Going below that, further improvements are marginal. >> >> When reducing the number of shuffle partitions from 200 to 10, physical >> latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU >> usage is reduced 2.6 times. >> >> When reducing the number of shuffle partitions from 200 to 5, physical >> latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU >> usage is reduced 4.5 times. >> >> Still, latency remains high, because the number of created files per >> microbatch remains high. >> >> .......... >> >> With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in >> checkpoint store and 6.9 files in Spark local-dir. >> Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are >> delta files. >> Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected) >> and 2 other files (shuffle.data+shuffle.index). >> >> Why would Spark need to create 20 delta files per microbatch, or to put >> it another way: 4 delta files per microbatch per shuffle partition? >> >> One could try to guess this could be due to changing "timestamp", but >> this does not bear out. In my produced stream (69 microbatches) there are >> only 46 distinct values for timestamp, consecutively increasing from first >> timestamp to last. Thus lots of microbatches will have just one timestamp >> value. On average, microbatch will have 1.5 distinct timestamp values. But >> it would, of course, be terribly wrong for Spark to use raw timestamp value >> as a key, as in real world almost every event would have a unique >> timestamp, so the number of files required for saving by timestamp as a key >> would be insane, hopefully Spark does not attempt to do that. But perhaps >> it may use the index of 2-minute window bucket as a key. If so, there are >> only 6 distinct values per the whole event set (I have window size set to 2 >> minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90% >> of microbatches will fall wholly in just one window bucket, and 10% in two >> buckets. So why 4 delta files per microbatch per shuffle partition? >> >> .......... >> >> For the completeness of the picture, if I run the test with the shuffle >> partition count set to 1, then: >> Spark creates 8 files (per microbatch) in the checkpoint store and 3 >> files in Spark local-dir. >> Of checkpoint files: 0.03 per microbatch are snapshot files (only 2 >> snapshot files in the whole run), and 4 delta files per microbatch. >> Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and >> 2 other files (shuffle.data+shuffle.index). >> >> .......... >> >> Thus Spark SS seems to keep 4 delta files per microbatch per shuffle >> partition, no matter what is the number of shuffle partitions. >> Why would it have to do this? >> >> Also unsure why Spark has to create local-dir files per every microbatch, >> rather than keeping them open across microbatches and re-using from one >> microbatch to another (writing data over, but without having to go through >> file creation)... >> >> *> Spark leverages HDFS API which is configured to create crc file per >> file by default. * >> >> >> This is unfortunate. Much better would be to create checkpoint files with >> HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right >> inside the main file itself, rather than as a separate file. >> >> * * * >> >> While we are at it, I wished to ask a related question. Suppose I create >> multiple parallel streaming pipelines in the applications, by pipeline >> meaning the whole data stream from initial Dataset/DatasetReader to the >> output of Spark query. Suppose I have multiple parallel pipelines in the >> application, a large number, let us say dozens or hundreds. >> >> How would Spark process them in terms of threading model? Will there be a >> separate thread per active stream/query or does Spark use a bounded thread >> pool? Do many streams/queries result in many threads, or a limited number >> of threads? >> >> Thanks, >> Sergey >> >