1. Is your join and aggregation based on the same keys? You might want to look at the execution plan. It is possible that without checkpointing, Spark puts join and aggregation into the same stage to eliminate shuffling. With a checkpoint, you might have forced Spark to introduce a shuffle. I am blind guessing here. You need to look at the execution plan to understand what Spark is doing internally
2) Are you certain that you are aggregating on the data that you get from checkpoint? Or are you aggregating on the data frame that you checkpointed? If it’s the latter, spark might be executing your read + join twice. Again, you might want to look at the execution plan From: "Schneider, Felix Jan" <[email protected]> Date: Thursday, October 7, 2021 at 7:56 AM To: "[email protected]" <[email protected]> Subject: [EXTERNAL] [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Spark-Users, my name is Felix and I’m currently working on a master’s thesis on adaptive checkpointing in Spark at TU Berlin. As part of this work, I experimented with a Spark Kubernetes cluster to measure the runtime differences of Spark applications with and without Reliable checkpoints of DataFrames. Below, I first explain how the experiment is set up and the assumptions I want to verify. Second, I describe how the experiment was conducted and what the outcome was. In the end, I share my open questions. Experiment setup: * DataFrame A: 20GB tabular data (e.g. a list of orders) * DataFrame B: 10GB tabular data (e.g. a list of order items) * Spark application A: first performs an inner-join of DataFrame A and DataFrame B to get the joined DataFrame C and then aggregates the joined DataFrame C * Spark application B: Does the same as Spark application A plus it checkpoints the joined DataFrame C before it aggregates it * Driver configuration: 1 driver instance with 4 CPU cores and 4GB of memory * Executors configuration: 10 executor instances with 4 CPU cores and 8GB of memory each Assumptions I want to verify: 1. Spark application A will take less time than Spark application B because Spark application B needs time to write a checkpoint to reliable storage (HDFS) 2. The mean runtime difference of Spark application A and Spark application B will be the mean of the time it will take to write the checkpoint. Experiment sequence: 1. 5 executions of Spark application A 2. 5 executions of Spark application B 3. The request of the total duration of each application from Spark's history server API 4. Reading the time it took to checkpoint a DataFrame from the driver log of execution, e.g.: * 21/10/01 09:39:05 INFO ReliableCheckpointRDD: Checkpointing took 10024 ms. * 21/10/01 09:39:05 INFO ReliableRDDCheckpointData: Done checkpointing RDD 68 to hdfs://checkpoints/rdd-68, new parent is RDD 77 1. Calculating the mean of the total duration of Spark application A’s executions and Spark application B’s executions to account for the differences in runtime due to e.g. cluster utilization 2. Calculating the mean of the time it took to checkpoint in Spark application B’s executions for the same reason as in 5. Experiment results: * The mean runtime of Spark application A: 11.72 minutes * The mean runtime of Spark application B: 27.38 minutes * -> The first assumption can be verified * The mean time it took to write a checkpoint in Spark application B: 3.41 minutes * -> The second assumption can not be verified because: (27.38 minutes - 11.72 minutes) > 3.41 minutes Questions: 1. Is this a valid approach to collect the time for a checkpoint or are there other possible ways to measure this? 2. How can the difference in the mean runtime of Spark application A and Spark application B be explained if it’s greater than the mean of the time it took to write a checkpoint in Spark application B? Feel free to share your thoughts and suggestions on the questions. I’m happy to discuss this topic. Thanks and kind regards, Felix
