Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-05-02 Thread Trường Trần Phan An
Hi all,

I have written a program and overridden two events onStageCompleted and
onTaskEnd. However, these two events do not provide information on when a
Task/Stage is completed.

What I want to know is which Task corresponds to which stage of a DAG (the
Spark history server only tells me how many stages a Job has and how many
Jobs a Stage has).

Can I print out the edges of the Tasks according to the DAGScheduler?
Below is the program I have written:

import org.apache.spark.rdd.RDD
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, TaskEndReason}
import org.apache.spark.scheduler.{SparkListener,
SparkListenerEnvironmentUpdate, SparkListenerStageCompleted,
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan

class CustomListener extends SparkListener {
  override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
val rdds = stageCompleted.stageInfo.rddInfos
val stageInfo = stageCompleted.stageInfo
println(s"Stage ${stageInfo.stageId}")
println(s"Number of tasks: ${stageInfo.numTasks}")

stageInfo.rddInfos.foreach { rddInfo =>
  println(s"RDD ${} has ${rddInfo.numPartitions} partitions.")

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val stageId = taskEnd.stageId
val stageAttemptId = taskEnd.stageAttemptId
val taskInfo = taskEnd.taskInfo
println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration:
${taskInfo.duration} ms.")

  def wordCount(sc: SparkContext, inputPath: String): Unit = {
val data = sc.textFile(inputPath)
val flatMap = data.flatMap(line => line.split(","))
val map = => (word, 1))
val reduceByKey = map.reduceByKey(_ + _)

object Scenario1 {
  def main(args: Array[String]): Unit = {

val appName = "scenario1"
val spark = SparkSession.builder()

val sc = spark.sparkContext
val sparkListener = new CustomListener()
val inputPath = "s3a://data-join/file00"
sparkListener.wordCount(sc, inputPath)


Best regards,


Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An <> đã viết:

> Dear Jacek Laskowski,
> Thank you for your guide. I will try it out for my problem.
> Best regards,
> Truong
> Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski 
> đã viết:
>> Hi,
>> Start with intercepting stage completions
>> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and
>> tasks).
>> Go up the execution chain to Spark SQL
>> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd
>> [3], and correlate infos.
>> You may want to look at how web UI works under the covers to collect all
>> the information. Start from SQLTab that should give you what is displayed
>> (that should give you then what's needed and how it's collected).
>> [1]
>> [2]
>> [3]
>> [4]
>> Pozdrawiam,
>> Jacek Laskowski
>> "The Internals Of" Online Books 
>> Follow me on
>> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <
>>> wrote:
>>> Hi,
>>> Can you give me more details or give me a tutorial on "You'd have to
>>> intercept execution events and correlate them. Not an easy task yet doable"
>>> Thank
>>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <
>>>> đã viết:

 tl;dr it's not possible to "reverse-engineer" tasks to functions.

 In essence, Spark SQL is an abstraction layer over RDD API that's made
 up of partitions and tasks. Tasks are Scala functions (possibly with some
 Python for PySpark). A simple-looking high-level operator like
 DataFrame.join can end up with multiple RDDs, each with a set of partitions
 (and hence tasks). What the tasks do is an implementation detail that you'd
 have to know about by reading the source code of Spark SQL that produces
 the "bytecode".

 Just looking at the DAG or the tasks scr

CVE-2023-32007: Apache Spark: Shell command injection via Spark UI

2023-05-02 Thread Arnout Engelen
Severity: important

Affected versions:

- Apache Spark 3.1.1 before 3.2.2


** UNSUPPORTED WHEN ASSIGNED ** The Apache Spark UI offers the possibility to 
enable ACLs via the configuration option spark.acls.enable. With an 
authentication filter, this checks whether a user has access permissions to 
view or modify the application. If ACLs are enabled, a code path in 
HttpSecurityFilter can allow someone to perform impersonation by providing an 
arbitrary user name. A malicious user might then be able to reach a permission 
check function that will ultimately build a Unix shell command based on their 
input, and execute it. This will result in arbitrary shell command execution as 
the user Spark is currently running as. This issue was disclosed earlier as 
CVE-2022-33891, but incorrectly claimed version 3.1.3 (which has since gone 
EOL) would not be affected.

NOTE: This vulnerability only affects products that are no longer supported by 
the maintainer.

Users are recommended to upgrade to a supported version of Apache Spark, such 
as version 3.4.0.


Sven Krewitt, Flashpoint (reporter)


To unsubscribe e-mail: