Hi Jungtaek,

*> I meant the subdirectory inside the directory you're providing as
"checkpointLocation", as there're several directories in that directory...*

There are two:

*my-spark-checkpoint-dir/MainApp*
created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir for
the app>)
contains only empty subdir with GUID name

*my-spark-checkpoint-dir/writer*
created by ds.writeStream().option("checkpointLocation", <checkpoint dir
for writer>)
contains all the files

Within the latter ("writer") there are four subdirectories: commits,
metadata, offsets, state.

Breakdown of file creations within them, per 69 microbatches (when shuffle
partition count = 200) is:

commits = 136
metadata = 0
offsets = 138
state = 56232

(Creation is identified by strace record for "openat" system call with
O_CREAT flag and file path in the corresponding directory.)

When shuffle partition count is 10, breakdown of file creations within
them, per 69 microbatches, is:

commits = 136
metadata = 0
offsets = 138
state = 2760

*> The size of the delta file heavily depends on your stateful operation
and data in each micro-batch. delta file only captures the "changes" of
state in specific micro-batch, so there're cases you'll have very tiny
delta files, e.g. cardinality of grouped key is small (hence cardinality of
KVs is also small), small amount of inputs are provided per micro-batch,
the overall size of aggregated row is small, there's skew on grouped key
(hence some partitions get no input or small inputs), etc.*


In my case there is no key in the Row object (unless the bucketized
"timestamp" for 2-min windows buckets becomes a key), and the microbatch is
large enough: the whole problem is that Spark does not want to save the
microbatch as a single file. Even after I reduce the number of shuffle
partitions (see below), the number of files per microbatch remains
significantly larger than the number of shuffle partitions.

..........

When the number of shuffle partitions is 200, Spark creates 816 files (per
microbatch) in checkpoint store and 202 files in Spark local-dir.

Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
delta files.
The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
Of local-dir files: 200 temp_shuffle files per microbatch (as expected) and
2 other files (shuffle.data+shuffle.index).

If I reduce the number of shuffle partitions, two things happen:
- Throughput of a single pipeline improves.
- CPU usage by the pipeline is reduced (allowing a single node to co-run
larger number of pipelines).
Most of the improvements are gained by the time the number of partitions is
reduced to 5-10.
Going below that, further improvements are marginal.

When reducing the number of shuffle partitions from 200 to 10, physical
latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU
usage is reduced 2.6 times.

When reducing the number of shuffle partitions from 200 to 5, physical
latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU
usage is reduced 4.5 times.

Still, latency remains high, because the number of created files per
microbatch remains high.

..........

With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in
checkpoint store and 6.9 files in Spark local-dir.
Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are
delta files.
Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected)
and 2 other files (shuffle.data+shuffle.index).

Why would Spark need to create 20 delta files per microbatch, or to put it
another way: 4 delta files per microbatch per shuffle partition?

One could try to guess this could be due to changing "timestamp", but this
does not bear out. In my produced stream (69 microbatches) there are only
46 distinct values for timestamp, consecutively increasing from first
timestamp to last. Thus lots of microbatches will have just one timestamp
value. On average, microbatch will have 1.5 distinct timestamp values. But
it would, of course, be terribly wrong for Spark to use raw timestamp value
as a key, as in real world almost every event would have a unique
timestamp, so the number of files required for saving by timestamp as a key
would be insane, hopefully Spark does not attempt to do that. But perhaps
it may use the index of 2-minute window bucket as a key. If so, there are
only 6 distinct values per the whole event set (I have window size set to 2
minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90%
of microbatches will fall wholly in just one window bucket, and 10% in two
buckets. So why 4 delta files per microbatch per shuffle partition?

..........

For the completeness of the picture, if I run the test with the shuffle
partition count set to 1, then:
Spark creates 8 files (per microbatch) in the checkpoint store and 3 files
in Spark local-dir.
Of checkpoint files: 0.03 per microbatch are snapshot files (only 2
snapshot files in the whole run), and 4 delta files per microbatch.
Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and 2
other files (shuffle.data+shuffle.index).

..........

Thus Spark SS seems to keep 4 delta files per microbatch per shuffle
partition, no matter what is the number of shuffle partitions.
Why would it have to do this?

Also unsure why Spark has to create local-dir files per every microbatch,
rather than keeping them open across microbatches and re-using from one
microbatch to another (writing data over, but without having to go through
file creation)...

*> Spark leverages HDFS API which is configured to create crc file per file
by default. *


This is unfortunate. Much better would be to create checkpoint files with
HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right
inside the main file itself, rather than as a separate file.

* * *

While we are at it, I wished to ask a related question. Suppose I create
multiple parallel streaming pipelines in the applications, by pipeline
meaning the whole data stream from initial Dataset/DatasetReader to the
output of Spark query. Suppose I have multiple parallel pipelines in the
application, a large number, let us say dozens or hundreds.

How would Spark process them in terms of threading model? Will there be a
separate thread per active stream/query or does Spark use a bounded thread
pool? Do many streams/queries result in many threads, or a limited number
of threads?

Thanks,
Sergey

Reply via email to