1.  Is your join and aggregation based on the same keys?

You might want to look at the execution plan. It is possible that without 
checkpointing, Spark puts join and aggregation into the same stage to eliminate 
shuffling. With a checkpoint, you might have forced Spark to introduce a 
shuffle. I am blind guessing here. You need to look at the execution plan to 
understand what Spark is doing internally

2) Are you certain that you are aggregating on the data that you get from 
checkpoint? Or are you aggregating on the data frame that you checkpointed? If 
it’s the latter, spark might be executing your read + join twice. Again, you 
might want to look at the execution plan

From: "Schneider, Felix Jan" <[email protected]>
Date: Thursday, October 7, 2021 at 7:56 AM
To: "[email protected]" <[email protected]>
Subject: [EXTERNAL] [Spark Core][Intermediate][How-to]: Measure the time for a 
checkpoint


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Spark-Users,

my name is Felix and I’m currently working on a master’s thesis on adaptive 
checkpointing in Spark at TU Berlin.
As part of this work, I experimented with a Spark Kubernetes cluster to measure 
the runtime differences of Spark applications with and without Reliable 
checkpoints of DataFrames.
Below, I first explain how the experiment is set up and the assumptions I want 
to verify.
Second, I describe how the experiment was conducted and what the outcome was.
In the end, I share my open questions.

Experiment setup:

  *   DataFrame A: 20GB tabular data (e.g. a list of orders)
  *   DataFrame B: 10GB tabular data (e.g. a list of order items)
  *   Spark application A: first performs an inner-join of DataFrame A and 
DataFrame B to get the joined DataFrame C and then aggregates the joined 
DataFrame C
  *   Spark application B: Does the same as Spark application A plus it 
checkpoints the joined DataFrame C before it aggregates it
  *   Driver configuration: 1 driver instance with 4 CPU cores and 4GB of memory
  *   Executors configuration: 10 executor instances with 4 CPU cores and 8GB 
of memory each

Assumptions I want to verify:

  1.  Spark application A will take less time than Spark application B because 
Spark application B needs time to write a checkpoint to reliable storage (HDFS)
  2.  The mean runtime difference of Spark application A and Spark application 
B will be the mean of the time it will take to write the checkpoint.

Experiment sequence:

  1.  5 executions of Spark application A
  2.  5 executions of Spark application B
  3.  The request of the total duration of each application from Spark's 
history server API
  4.  Reading the time it took to checkpoint a DataFrame from the driver log of 
execution, e.g.:

     *   21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took 10024 
ms.
     *   21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done checkpointing 
RDD 68 to hdfs://checkpoints/rdd-68, new parent is RDD 77

  1.  Calculating the mean of the total duration of Spark application A’s 
executions and Spark application B’s executions to account for the differences 
in runtime due to e.g. cluster utilization
  2.  Calculating the mean of the time it took to checkpoint in Spark 
application B’s executions for the same reason as in 5.


Experiment results:

  *   The mean runtime of Spark application A: 11.72 minutes
  *   The mean runtime of Spark application B: 27.38 minutes
  *   -> The first assumption can be verified
  *   The mean time it took to write a checkpoint in Spark application B: 3.41 
minutes
  *   -> The second assumption can not be verified because: (27.38 minutes - 
11.72 minutes) > 3.41 minutes

Questions:

  1.  Is this a valid approach to collect the time for a checkpoint or are 
there other possible ways to measure this?
  2.  How can the difference in the mean runtime of Spark application A and 
Spark application B be explained if it’s greater than the mean of the time it 
took to write a checkpoint in Spark application B?

Feel free to share your thoughts and suggestions on the questions.
I’m happy to discuss this topic.

Thanks and kind regards,
Felix

Reply via email to