Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1850156431 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.ThreadUtils + +/** + * Test cases for the tagging and cancellation APIs provided by [[SparkSession]]. + */ +@ExtendedSQLTest +class SparkSessionJobTaggingAndCancellationSuite + extends SparkFunSuite + with Eventually + with LocalSparkContext { + + override def afterEach(): Unit = { +try { + // This suite should not interfere with the other test suites. + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.getDefaultSession.foreach(_.stop()) + SparkSession.clearDefaultSession() + resetSparkContext() +} finally { + super.afterEach() +} + } + + test("Tags are not inherited by new sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val newSession = session.newSession() +assert(newSession.getTags() == Set()) + } + + test("Tags are inherited by cloned sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val clonedSession = session.cloneSession() +assert(clonedSession.getTags() == Set("one")) +clonedSession.addTag("two") +assert(clonedSession.getTags() == Set("one", "two")) + +// Tags are not propagated back to the original session +assert(session.getTags() == Set("one")) + } + + test("Tags set from session are prefixed with session UUID") { +sc = new SparkContext("local[2]", "test") +val session = SparkSession.builder().sparkContext(sc).getOrCreate() +import session.implicits._ + +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +sem.release() + } +}) + +session.addTag("one") +Future { + session.range(1, 1).map { i => Thread.sleep(100); i }.count() +}(ExecutionContext.global) + +assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES)) +val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(session.managedJobTags.get("one"), "reason") +val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head +val actualTags = activeJob.properties.getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) +assert(actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-one", + SQLExecution.executionIdJobTag( +session, + activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong))) + } + + test("Cancellation APIs in SparkSession are isolated") { +sc = new SparkContext("local[2]", "test") +val globalSession = SparkSession.builder().sparkContext(sc).getOrCreate() +var (sessionA, sessionB, sessionC): (SparkSession, SparkSession, SparkSession) = + (null, null, null) + +// global ExecutionContext has only 2 threads in Apache Spark CI +// create own thread pool for four Futures used in this test +val numThreads = 3 +val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-p
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1849328583 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.ThreadUtils + +/** + * Test cases for the tagging and cancellation APIs provided by [[SparkSession]]. + */ +@ExtendedSQLTest +class SparkSessionJobTaggingAndCancellationSuite + extends SparkFunSuite + with Eventually + with LocalSparkContext { + + override def afterEach(): Unit = { +try { + // This suite should not interfere with the other test suites. + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.getDefaultSession.foreach(_.stop()) + SparkSession.clearDefaultSession() + resetSparkContext() +} finally { + super.afterEach() +} + } + + test("Tags are not inherited by new sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val newSession = session.newSession() +assert(newSession.getTags() == Set()) + } + + test("Tags are inherited by cloned sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val clonedSession = session.cloneSession() +assert(clonedSession.getTags() == Set("one")) +clonedSession.addTag("two") +assert(clonedSession.getTags() == Set("one", "two")) + +// Tags are not propagated back to the original session +assert(session.getTags() == Set("one")) + } + + test("Tags set from session are prefixed with session UUID") { +sc = new SparkContext("local[2]", "test") +val session = SparkSession.builder().sparkContext(sc).getOrCreate() +import session.implicits._ + +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +sem.release() + } +}) + +session.addTag("one") +Future { + session.range(1, 1).map { i => Thread.sleep(100); i }.count() +}(ExecutionContext.global) + +assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES)) +val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(session.managedJobTags.get("one"), "reason") +val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head +val actualTags = activeJob.properties.getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) +assert(actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-one", + SQLExecution.executionIdJobTag( +session, + activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong))) + } + + test("Cancellation APIs in SparkSession are isolated") { +sc = new SparkContext("local[2]", "test") +val globalSession = SparkSession.builder().sparkContext(sc).getOrCreate() +var (sessionA, sessionB, sessionC): (SparkSession, SparkSession, SparkSession) = + (null, null, null) + +// global ExecutionContext has only 2 threads in Apache Spark CI +// create own thread pool for four Futures used in this test +val numThreads = 3 +val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thre
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1848073873 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.ThreadUtils + +/** + * Test cases for the tagging and cancellation APIs provided by [[SparkSession]]. + */ +@ExtendedSQLTest +class SparkSessionJobTaggingAndCancellationSuite + extends SparkFunSuite + with Eventually + with LocalSparkContext { + + override def afterEach(): Unit = { +try { + // This suite should not interfere with the other test suites. + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.getDefaultSession.foreach(_.stop()) + SparkSession.clearDefaultSession() + resetSparkContext() +} finally { + super.afterEach() +} + } + + test("Tags are not inherited by new sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val newSession = session.newSession() +assert(newSession.getTags() == Set()) + } + + test("Tags are inherited by cloned sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val clonedSession = session.cloneSession() +assert(clonedSession.getTags() == Set("one")) +clonedSession.addTag("two") +assert(clonedSession.getTags() == Set("one", "two")) + +// Tags are not propagated back to the original session +assert(session.getTags() == Set("one")) + } + + test("Tags set from session are prefixed with session UUID") { +sc = new SparkContext("local[2]", "test") +val session = SparkSession.builder().sparkContext(sc).getOrCreate() +import session.implicits._ + +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +sem.release() + } +}) + +session.addTag("one") +Future { + session.range(1, 1).map { i => Thread.sleep(100); i }.count() +}(ExecutionContext.global) + +assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES)) +val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(session.managedJobTags.get("one"), "reason") +val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head +val actualTags = activeJob.properties.getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) +assert(actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-one", + SQLExecution.executionIdJobTag( +session, + activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong))) + } + + test("Cancellation APIs in SparkSession are isolated") { +sc = new SparkContext("local[2]", "test") +val globalSession = SparkSession.builder().sparkContext(sc).getOrCreate() +var (sessionA, sessionB, sessionC): (SparkSession, SparkSession, SparkSession) = + (null, null, null) + +// global ExecutionContext has only 2 threads in Apache Spark CI +// create own thread pool for four Futures used in this test +val numThreads = 3 +val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-p
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
itholic commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1847520994 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.ThreadUtils + +/** + * Test cases for the tagging and cancellation APIs provided by [[SparkSession]]. + */ +@ExtendedSQLTest +class SparkSessionJobTaggingAndCancellationSuite + extends SparkFunSuite + with Eventually + with LocalSparkContext { + + override def afterEach(): Unit = { +try { + // This suite should not interfere with the other test suites. + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.getDefaultSession.foreach(_.stop()) + SparkSession.clearDefaultSession() + resetSparkContext() +} finally { + super.afterEach() +} + } + + test("Tags are not inherited by new sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val newSession = session.newSession() +assert(newSession.getTags() == Set()) + } + + test("Tags are inherited by cloned sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val clonedSession = session.cloneSession() +assert(clonedSession.getTags() == Set("one")) +clonedSession.addTag("two") +assert(clonedSession.getTags() == Set("one", "two")) + +// Tags are not propagated back to the original session +assert(session.getTags() == Set("one")) + } + + test("Tags set from session are prefixed with session UUID") { +sc = new SparkContext("local[2]", "test") +val session = SparkSession.builder().sparkContext(sc).getOrCreate() +import session.implicits._ + +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +sem.release() + } +}) + +session.addTag("one") +Future { + session.range(1, 1).map { i => Thread.sleep(100); i }.count() +}(ExecutionContext.global) + +assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES)) +val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(session.managedJobTags.get("one"), "reason") +val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head +val actualTags = activeJob.properties.getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) +assert(actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-one", + SQLExecution.executionIdJobTag( +session, + activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong))) + } + + test("Cancellation APIs in SparkSession are isolated") { +sc = new SparkContext("local[2]", "test") +val globalSession = SparkSession.builder().sparkContext(sc).getOrCreate() +var (sessionA, sessionB, sessionC): (SparkSession, SparkSession, SparkSession) = + (null, null, null) + +// global ExecutionContext has only 2 threads in Apache Spark CI +// create own thread pool for four Futures used in this test +val numThreads = 3 +val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-p
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
itholic commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1847523272 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.ThreadUtils + +/** + * Test cases for the tagging and cancellation APIs provided by [[SparkSession]]. + */ +@ExtendedSQLTest +class SparkSessionJobTaggingAndCancellationSuite + extends SparkFunSuite + with Eventually + with LocalSparkContext { + + override def afterEach(): Unit = { +try { + // This suite should not interfere with the other test suites. + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.getDefaultSession.foreach(_.stop()) + SparkSession.clearDefaultSession() + resetSparkContext() +} finally { + super.afterEach() +} + } + + test("Tags are not inherited by new sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val newSession = session.newSession() +assert(newSession.getTags() == Set()) + } + + test("Tags are inherited by cloned sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val clonedSession = session.cloneSession() +assert(clonedSession.getTags() == Set("one")) +clonedSession.addTag("two") +assert(clonedSession.getTags() == Set("one", "two")) + +// Tags are not propagated back to the original session +assert(session.getTags() == Set("one")) + } + + test("Tags set from session are prefixed with session UUID") { +sc = new SparkContext("local[2]", "test") +val session = SparkSession.builder().sparkContext(sc).getOrCreate() +import session.implicits._ + +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +sem.release() + } +}) + +session.addTag("one") +Future { + session.range(1, 1).map { i => Thread.sleep(100); i }.count() +}(ExecutionContext.global) + +assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES)) +val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(session.managedJobTags.get("one"), "reason") +val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head +val actualTags = activeJob.properties.getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) +assert(actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-one", + SQLExecution.executionIdJobTag( +session, + activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong))) + } + + test("Cancellation APIs in SparkSession are isolated") { +sc = new SparkContext("local[2]", "test") +val globalSession = SparkSession.builder().sparkContext(sc).getOrCreate() +var (sessionA, sessionB, sessionC): (SparkSession, SparkSession, SparkSession) = + (null, null, null) + +// global ExecutionContext has only 2 threads in Apache Spark CI +// create own thread pool for four Futures used in this test +val numThreads = 3 +val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-p
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
itholic commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1847514875 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.ThreadUtils + +/** + * Test cases for the tagging and cancellation APIs provided by [[SparkSession]]. + */ +@ExtendedSQLTest +class SparkSessionJobTaggingAndCancellationSuite + extends SparkFunSuite + with Eventually + with LocalSparkContext { + + override def afterEach(): Unit = { +try { + // This suite should not interfere with the other test suites. + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.getDefaultSession.foreach(_.stop()) + SparkSession.clearDefaultSession() + resetSparkContext() +} finally { + super.afterEach() +} + } + + test("Tags are not inherited by new sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val newSession = session.newSession() +assert(newSession.getTags() == Set()) + } + + test("Tags are inherited by cloned sessions") { +val session = SparkSession.builder().master("local").getOrCreate() + +assert(session.getTags() == Set()) +session.addTag("one") +assert(session.getTags() == Set("one")) + +val clonedSession = session.cloneSession() +assert(clonedSession.getTags() == Set("one")) +clonedSession.addTag("two") +assert(clonedSession.getTags() == Set("one", "two")) + +// Tags are not propagated back to the original session +assert(session.getTags() == Set("one")) + } + + test("Tags set from session are prefixed with session UUID") { +sc = new SparkContext("local[2]", "test") +val session = SparkSession.builder().sparkContext(sc).getOrCreate() +import session.implicits._ + +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { +sem.release() + } +}) + +session.addTag("one") +Future { + session.range(1, 1).map { i => Thread.sleep(100); i }.count() +}(ExecutionContext.global) + +assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES)) +val activeJobsFuture = + session.sparkContext.cancelJobsWithTagWithFuture(session.managedJobTags.get("one"), "reason") +val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head +val actualTags = activeJob.properties.getProperty(SparkContext.SPARK_JOB_TAGS) + .split(SparkContext.SPARK_JOB_TAGS_SEP) +assert(actualTags.toSet == Set( + session.sessionJobTag, + s"${session.sessionJobTag}-one", + SQLExecution.executionIdJobTag( +session, + activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong))) + } + + test("Cancellation APIs in SparkSession are isolated") { +sc = new SparkContext("local[2]", "test") +val globalSession = SparkSession.builder().sparkContext(sc).getOrCreate() +var (sessionA, sessionB, sessionC): (SparkSession, SparkSession, SparkSession) = + (null, null, null) + +// global ExecutionContext has only 2 threads in Apache Spark CI +// create own thread pool for four Futures used in this test +val numThreads = 3 +val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-p
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2432596207 Trying out a fix at https://github.com/apache/spark/pull/48622. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2432432181 On it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2431640647 @xupefei mind taking a look please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
dongjoon-hyun commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2419853347 Ping once more, @xupefei and @hvanhovell . Could you fix the flakiness or disable it (if you are busy), please? - https://github.com/apache/spark/actions/runs/11353570330/job/31654735430 ``` SparkSessionJobTaggingAndCancellationSuite: ... - Cancellation APIs in SparkSession are isolated *** FAILED *** ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
dongjoon-hyun commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2358664338 FYI, there are two open JIRA issues in the interrupt and cancellation area. ![Screenshot 2024-09-18 at 07 39 14](https://github.com/user-attachments/assets/72c6b76b-443d-46db-8b78-15d18bcf3378) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
asfgit closed pull request #47815: [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core URL: https://github.com/apache/spark/pull/47815 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2357401575 Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1756480825 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala: ## @@ -44,14 +44,17 @@ object SQLExecution extends Logging { private def nextExecutionId: Long = _nextExecutionId.getAndIncrement - private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() + private[sql] val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() Review Comment: >does this work streaming queries too? No it's not. I have to change https://github.com/apache/spark/blob/8023504e69fdd037dea002e961b960fd9fa662ba/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L313 to make it work. Will do as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1752028523 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -167,7 +168,7 @@ private[spark] class DAGScheduler( // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] - private[scheduler] val activeJobs = new HashSet[ActiveJob] + private[spark] val activeJobs = new HashSet[ActiveJob] Review Comment: Done! I reverted the change and switched to use `SparkListenerJobStart` to get the tags. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747751422 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -167,7 +168,7 @@ private[spark] class DAGScheduler( // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] - private[scheduler] val activeJobs = new HashSet[ActiveJob] + private[spark] val activeJobs = new HashSet[ActiveJob] Review Comment: I am not in favor of exposing this state - even if only for tests currently. Let us keep PR consistent, the change is small enough that we dont need to split it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747749525 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2684,6 +2685,25 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, None) } + /** + * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`. + * + * @param tag The tag to be cancelled. Cannot contain ',' (comma) character. + * @param reason reason for cancellation. + * @return A future with [[ActiveJob]]s, allowing extraction of information such as Job ID and + * tags. + */ + private[spark] def cancelJobsWithTagWithFuture( Review Comment: Noticed that pattern in sql when reviewing (and so resolved :) ), thanks for clarifying ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747532654 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala: ## @@ -44,14 +44,17 @@ object SQLExecution extends Logging { private def nextExecutionId: Long = _nextExecutionId.getAndIncrement - private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() + private[sql] val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() Review Comment: Thank for the comment - I'll add it to the documentation. Streaming - not sure, I'll check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747498524 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2684,6 +2685,25 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, None) } + /** + * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`. + * + * @param tag The tag to be cancelled. Cannot contain ',' (comma) character. + * @param reason reason for cancellation. + * @return A future with [[ActiveJob]]s, allowing extraction of information such as Job ID and + * tags. + */ + private[spark] def cancelJobsWithTagWithFuture( Review Comment: The reason for adding another thing is that jobGroupId tends to be used in a lot of places, and different components are overwriting the jobGroupIds, for example broadcast was doing this, and as a result cancelling a jobGroup would not cancel a broadcast job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747393173 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -264,8 +284,10 @@ class SparkSession private( Some(sharedState), Some(sessionState), extensions, - Map.empty) + Map.empty, + managedJobTags.asScala.toMap) Review Comment: Unfortunately, it is a mutable concurrent map - and is backed by the same map (so changes propagate back to `managedJobTags`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747387516 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1234,17 +1242,23 @@ private[spark] class DAGScheduler( jobIds.foreach(handleJobCancellation(_, Option(updatedReason))) } - private[scheduler] def handleJobTagCancelled(tag: String, reason: Option[String]): Unit = { -// Cancel all jobs belonging that have this tag. + private[scheduler] def handleJobTagCancelled( + tag: String, + reason: Option[String], + cancelledJobs: Option[Promise[Seq[ActiveJob]]]): Unit = { +// Cancel all jobs that have all provided tags. // First finds all active jobs with this group id, and then kill stages for them. -val jobIds = activeJobs.filter { activeJob => +val jobsToBeCancelled = activeJobs.filter { activeJob => Option(activeJob.properties).exists { properties => Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("") .split(SparkContext.SPARK_JOB_TAGS_SEP).filter(!_.isEmpty).toSet.contains(tag) } -}.map(_.jobId) -val updatedReason = reason.getOrElse("part of cancelled job tag %s".format(tag)) -jobIds.foreach(handleJobCancellation(_, Option(updatedReason))) +} +cancelledJobs.map(_.success(jobsToBeCancelled.toSeq)) Review Comment: Do this after `handleJobCancellation` ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1234,17 +1242,23 @@ private[spark] class DAGScheduler( jobIds.foreach(handleJobCancellation(_, Option(updatedReason))) } - private[scheduler] def handleJobTagCancelled(tag: String, reason: Option[String]): Unit = { -// Cancel all jobs belonging that have this tag. + private[scheduler] def handleJobTagCancelled( + tag: String, + reason: Option[String], + cancelledJobs: Option[Promise[Seq[ActiveJob]]]): Unit = { +// Cancel all jobs that have all provided tags. // First finds all active jobs with this group id, and then kill stages for them. -val jobIds = activeJobs.filter { activeJob => +val jobsToBeCancelled = activeJobs.filter { activeJob => Option(activeJob.properties).exists { properties => Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("") .split(SparkContext.SPARK_JOB_TAGS_SEP).filter(!_.isEmpty).toSet.contains(tag) } -}.map(_.jobId) -val updatedReason = reason.getOrElse("part of cancelled job tag %s".format(tag)) -jobIds.foreach(handleJobCancellation(_, Option(updatedReason))) +} +cancelledJobs.map(_.success(jobsToBeCancelled.toSeq)) Review Comment: Do this after `handleJobCancellation` below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747364819 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2684,6 +2685,25 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, None) } + /** + * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`. + * + * @param tag The tag to be cancelled. Cannot contain ',' (comma) character. + * @param reason reason for cancellation. + * @return A future with [[ActiveJob]]s, allowing extraction of information such as Job ID and + * tags. + */ + private[spark] def cancelJobsWithTagWithFuture( Review Comment: Ok, so this got added to 3.5 - missed it there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747364819 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2684,6 +2685,25 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, None) } + /** + * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`. + * + * @param tag The tag to be cancelled. Cannot contain ',' (comma) character. + * @param reason reason for cancellation. + * @return A future with [[ActiveJob]]s, allowing extraction of information such as Job ID and + * tags. + */ + private[spark] def cancelJobsWithTagWithFuture( Review Comment: Ok, so this got added to 3.5, sigh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
mridulm commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1747358212 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2684,6 +2685,25 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelJobGroup(groupId, cancelFutureJobs = true, None) } + /** + * Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`. + * + * @param tag The tag to be cancelled. Cannot contain ',' (comma) character. + * @param reason reason for cancellation. + * @return A future with [[ActiveJob]]s, allowing extraction of information such as Job ID and + * tags. + */ + private[spark] def cancelJobsWithTagWithFuture( Review Comment: There is already existing support for group id - why not leverage that instead of adding a new construct ? I did not see the initial PR in 4.0 which added tag to dag scheduler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1746566200 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala: ## @@ -44,14 +44,17 @@ object SQLExecution extends Logging { private def nextExecutionId: Long = _nextExecutionId.getAndIncrement - private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() + private[sql] val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]() Review Comment: BTW, does this work streaming queries too? I am fine with doing it in a followup but would like to make sure Spark Connect and Classic versions behave the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1745490540 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala: ## @@ -238,15 +245,30 @@ object SQLExecution extends Logging { val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) withSQLConfPropagated(sparkSession) { - try { -sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId) -body - } finally { -sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) + withSessionTagsApplied(sparkSession) { +try { + sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId) + body +} finally { + sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) +} } } } + private[sql] def withSessionTagsApplied[T](sparkSession: SparkSession)(block: => T): T = { +sparkSession.sparkContext.addJobTag(sparkSession.sessionJobTag) Review Comment: This begs for a method that sets them all at once... you are incrementally building a string here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1745480347 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -264,8 +284,10 @@ class SparkSession private( Some(sharedState), Some(sessionState), extensions, - Map.empty) + Map.empty, + managedJobTags.asScala.toMap) Review Comment: qq, does this produce an immutable map (I think it should)? If so, then you don't have to force materialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1745473650 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -649,6 +671,77 @@ class SparkSession private( artifactManager.addLocalArtifacts(uri.flatMap(Artifact.parseArtifacts)) } + /** @inheritdoc */ + override def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +managedJobTags.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** @inheritdoc */ + override def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +managedJobTags.remove(tag) + } + + /** @inheritdoc */ + override def getTags(): Set[String] = managedJobTags.keys().asScala.toSet + + /** @inheritdoc */ + override def clearTags(): Unit = managedJobTags.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of SQL execution IDs requested to be interrupted. + + * @since 4.0.0 + */ + override def interruptAll(): Seq[String] = +doInterruptTag(sessionJobTag, "as part of cancellation of all jobs") + + /** + * Request to interrupt all currently running operations of this session with the given job tag. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return Sequence of SQL execution IDs requested to be interrupted. + */ + override def interruptTag(tag: String): Seq[String] = { +val realTag = managedJobTags.get(tag) +if (realTag == null) return Seq.empty +doInterruptTag(realTag, s"part of cancelled job tags $tag") + } + + private def doInterruptTag(tag: String, reason: String): Seq[String] = { +val cancelledTags = + sparkContext.cancelJobsWithTagWithFuture(tag, reason) + +ThreadUtils.awaitResult(cancelledTags, 60.seconds) + .flatMap(job => Option(job.properties.getProperty(SQLExecution.EXECUTION_ROOT_ID_KEY))) + } + + /** + * Request to interrupt an operation of this session, given its SQL execution ID. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return The execution ID requested to be interrupted, as a single-element sequence, or an empty + *sequence if the operation is not started by this session. + * + * @since 4.0.0 + */ + override def interruptOperation(operationId: String): Seq[String] = { +scala.util.Try(operationId.toLong).toOption match { + case Some(executionIdToBeCancelled) => +val tagToBeCancelled = SQLExecution.executionIdJobTag(this, executionIdToBeCancelled) +doInterruptTag(tagToBeCancelled, reason = "") Review Comment: Perhaps set a reason? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1745467918 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -649,6 +671,77 @@ class SparkSession private( artifactManager.addLocalArtifacts(uri.flatMap(Artifact.parseArtifacts)) } + /** @inheritdoc */ + override def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +managedJobTags.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** @inheritdoc */ + override def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) Review Comment: Not really needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1745463529 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +130,17 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** Tag to mark all jobs owned by this session. */ + private[sql] lazy val sessionJobTag = s"spark-session-$sessionUUID" + + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-session-$sessionUUID-tag1"`. + */ + @transient + private[sql] lazy val managedJobTags: ConcurrentHashMap[String, String] = + new ConcurrentHashMap(parentManagedJobTags.asJava) Review Comment: Odd style... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1738508417 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( + _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt all currently running operations of this session with the given operation + * tag. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return Sequence of job IDs requested to be interrupted. + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt an operation of this session, given its job ID. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return The job ID requested to be interrupted, as a single-element sequence, or an empty + *sequence if the operation is not started by this session. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1736187746 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala: ## Review Comment: LOL maybe trim this down a bit :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1734760745 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( Review Comment: Done! We now use a hidden tag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1734759963 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( + _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt all currently running operations of this session with the given operation + * tag. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return Sequence of job IDs requested to be interrupted. + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt an operation of this session, given its job ID. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return The job ID requested to be interrupted, as a single-element sequence, or an empty + *sequence if the operation is not started by this session. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { +scala.util.Try(jobId.toInt).toOption match { + case Some(jobIdToBeCancelled) => +val cancelledIds = sparkContext.cancelJob( + jobIdToBeCancelled, + "Interrupted by user", + shouldCancelJob = _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID Review Comment: Yes. We don't need this now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1734759369 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -1170,7 +1311,12 @@ object SparkSession extends Logging { * @since 2.0.0 */ def setActiveSession(session: SparkSession): Unit = { +clearActiveSession() activeThreadSession.set(session) +if (session != null) { + session.sparkContext.setLocalProperty(SPARK_SESSION_UUID_PROPERTY_KEY, session.sessionUUID) + session.userDefinedToRealTagsMap.values().asScala.foreach(session.sparkContext.addJobTag) Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1734187514 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( Review Comment: I tried this approach before. The problem is that adding a tag will break many tests, as they assume a job has no tag unless added. We could make those tests happy by prefixing the tag with some special characters such as `__hidden_tag__` and do not return them by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1733922553 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( + _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt all currently running operations of this session with the given operation + * tag. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return Sequence of job IDs requested to be interrupted. + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt an operation of this session, given its job ID. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return The job ID requested to be interrupted, as a single-element sequence, or an empty + *sequence if the operation is not started by this session. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { +scala.util.Try(jobId.toInt).toOption match { + case Some(jobIdToBeCancelled) => +val cancelledIds = sparkContext.cancelJob( + jobIdToBeCancelled, + "Interrupted by user", + shouldCancelJob = _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID Review Comment: Why is this check needed? To prevent people from cancelling other people's jobs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1733919998 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -1170,7 +1311,12 @@ object SparkSession extends Logging { * @since 2.0.0 */ def setActiveSession(session: SparkSession): Unit = { +clearActiveSession() activeThreadSession.set(session) +if (session != null) { + session.sparkContext.setLocalProperty(SPARK_SESSION_UUID_PROPERTY_KEY, session.sessionUUID) + session.userDefinedToRealTagsMap.values().asScala.foreach(session.sparkContext.addJobTag) Review Comment: I am not sure if this is bullet proof. It is better to set this up in `SQLExecution`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1733919350 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( + _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt all currently running operations of this session with the given operation + * tag. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return Sequence of job IDs requested to be interrupted. + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Request to interrupt an operation of this session, given its job ID. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + * + * @return The job ID requested to be interrupted, as a single-element sequence, or an empty + *sequence if the operation is not started by this session. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { Review Comment: There are two things we should address here: - Operation roughly equates to query (at least it does in connect land). A query can have many jobs. I don't think this is semantically right if the implementations differ as much as they do. - We should probably use something that is findable in the UI. How about we use the root SQL Execution ID? We can just create another tag for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1733915208 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +814,120 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Request to interrupt all currently running operations of this session. + * + * @note This method will wait up to 60 seconds for the interruption request to be issued. + + * @return Sequence of job IDs requested to be interrupted. + + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs( Review Comment: I would just create a tag for this session and cancel on that. No need to make this more complicated. You need to be a bit careful with parent to child inheritance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1732437269 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { +setLocalProperty(SparkContext.SPARK_SESSION_UUID, uuid) Review Comment: I stopped mentioning "SparkSession" in Core and use setLocalProperty instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1732403023 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +124,12 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-$sessionUUID-tag1"`. + */ + private val userDefinedToRealTagsMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap() Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1732401097 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +802,114 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs(_.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt all operations of this session with the given operation tag. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptTag(tag: String): Seq[String] = { Review Comment: Done! Now the scheduler will first collect job IDs that will be cancelled, then unblock the caller, and then process the cancellation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1731485144 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +802,114 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs(_.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt all operations of this session with the given operation tag. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt an operation of this session with the given Job ID. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { +scala.util.Try(jobId.toInt).toOption match { + case Some(jobIdToBeCancelled) => +val cancelledIds = sparkContext.cancelJob( + jobIdToBeCancelled, + "Interrupted by user", + shouldCancelJob = _.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2309874971 > I feel like what you're doing here is similar with `JobArtifactSet`. It has things to do with `SparkContext` but we separated them to `JobArtifactSet` with a state so we can decouple Spark core from Spark SQL. Yes exactly. Basically the equivalent of `JobArtifactSet.withActiveJobArtifactState` is `SparkSession.withActive`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1731031119 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { +setLocalProperty(SparkContext.SPARK_SESSION_UUID, uuid) Review Comment: How about removing this API and using `setLocalProperty` instead? Also the `getSparkSessionUUID` API in ActiveJob will be removed. From Session we'll directly inspect the properties. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1731031119 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { +setLocalProperty(SparkContext.SPARK_SESSION_UUID, uuid) Review Comment: How about removing this API and using `setLocalProperty` instead? Also the `getSparkSessionUUID` API in ActiveJob will be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2309825027 I feel like what you're doing here is similar with `JobArtifactSet`. It has things to do with `SparkContext` but we separated them to `JobArtifactSet` with a state so we can decouple Spark core from Spark SQL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1731006930 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +124,12 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-$sessionUUID-tag1"`. + */ + private val userDefinedToRealTagsMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap() Review Comment: Tags set in parent sessions should be set in children sessions too. Can we match this with Spark Connect cancellation implementation? It's already doing so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1731004930 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { +setLocalProperty(SparkContext.SPARK_SESSION_UUID, uuid) Review Comment: Spark core has to be separated from Spark SQL, and now we're putting SQL stuff into Spark core. Can we avoid this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1730990466 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +802,114 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs(_.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt all operations of this session with the given operation tag. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt an operation of this session with the given Job ID. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { +scala.util.Try(jobId.toInt).toOption match { + case Some(jobIdToBeCancelled) => +val cancelledIds = sparkContext.cancelJob( + jobIdToBeCancelled, + "Interrupted by user", + shouldCancelJob = _.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq Review Comment: Now it blockes at most 60s for all jobs to be cancelled. This waiting time will be much shorter after I changed the code to make it non-blocking (only waiting for enumerating all jobs)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1730986833 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +124,12 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-$sessionUUID-tag1"`. + */ + private val userDefinedToRealTagsMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap() Review Comment: Good point. I'll change the code. Btw, how should we handle session IDs? Should calling `interruptAll` from parent sessions terminate jobs from child sessions? ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +124,12 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-$sessionUUID-tag1"`. + */ + private val userDefinedToRealTagsMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap() Review Comment: Good point. I'll change the code. Btw, how should we handle session IDs? Should calling `interruptAll` from parent sessions terminate jobs from child sessions (I think we should)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1730983717 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +802,114 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs(_.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt all operations of this session with the given operation tag. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptTag(tag: String): Seq[String] = { Review Comment: Oh good to know, I think it's possible to invoke the callback (with Job IDs which are going to be cancelled) earlier to make it non-blocking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1730981763 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { +setLocalProperty(SparkContext.SPARK_SESSION_UUID, uuid) Review Comment: Here I'm trying to inject session UUID into new jobs, so from Session we could cancel them all. Do you have concerns about this approach or the naming? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1730964549 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2708,13 +2741,51 @@ class SparkContext(config: SparkConf) extends Logging { def cancelJobsWithTag(tag: String): Unit = { SparkContext.throwIfInvalidTag(tag) assertNotStopped() -dagScheduler.cancelJobsWithTag(tag, None) +dagScheduler.cancelJobsWithTag( + tag, + reason = None, + shouldCancelJob = None, + cancelledJobs = None) + } + + /** + * Cancel all jobs that have been scheduled or are running. + * + * @param shouldCancelJob Callback function to be called with the job ID of each job that matches + *the given tag. If the function returns true, the job will be cancelled. + * @return A future that will be completed with the set of job IDs that were cancelled. + */ + def cancelAllJobs(shouldCancelJob: ActiveJob => Boolean): Future[Set[Int]] = { Review Comment: Yeah let's make them `private[spark]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1730962280 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2708,13 +2741,51 @@ class SparkContext(config: SparkConf) extends Logging { def cancelJobsWithTag(tag: String): Unit = { SparkContext.throwIfInvalidTag(tag) assertNotStopped() -dagScheduler.cancelJobsWithTag(tag, None) +dagScheduler.cancelJobsWithTag( + tag, + reason = None, + shouldCancelJob = None, + cancelledJobs = None) + } + + /** + * Cancel all jobs that have been scheduled or are running. + * + * @param shouldCancelJob Callback function to be called with the job ID of each job that matches + *the given tag. If the function returns true, the job will be cancelled. + * @return A future that will be completed with the set of job IDs that were cancelled. + */ + def cancelAllJobs(shouldCancelJob: ActiveJob => Boolean): Future[Set[Int]] = { Review Comment: I think I can just make these ones private. They're added here to be used solely by the new APIs in SparkSession. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729789523 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +802,114 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs(_.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt all operations of this session with the given operation tag. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptTag(tag: String): Seq[String] = { +val realTag = userDefinedToRealTagsMap.get(tag) +if (realTag == null) return Seq.empty + +val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by user", _ => true) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt an operation of this session with the given Job ID. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptOperation(jobId: String): Seq[String] = { +scala.util.Try(jobId.toInt).toOption match { + case Some(jobIdToBeCancelled) => +val cancelledIds = sparkContext.cancelJob( + jobIdToBeCancelled, + "Interrupted by user", + shouldCancelJob = _.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq Review Comment: Can you document this? Do we block and wait 60 secs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729789441 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +124,12 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-$sessionUUID-tag1"`. + */ + private val userDefinedToRealTagsMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap() Review Comment: Otherwise, jobs like broadcast hash joins won't be cancelled IIRC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729789088 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2708,13 +2741,51 @@ class SparkContext(config: SparkConf) extends Logging { def cancelJobsWithTag(tag: String): Unit = { SparkContext.throwIfInvalidTag(tag) assertNotStopped() -dagScheduler.cancelJobsWithTag(tag, None) +dagScheduler.cancelJobsWithTag( + tag, + reason = None, + shouldCancelJob = None, + cancelledJobs = None) + } + + /** + * Cancel all jobs that have been scheduled or are running. + * Review Comment: Let's add ` @since 4.0.0` for all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729788722 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -122,6 +124,12 @@ class SparkSession private( .getOrElse(SQLConf.getFallbackConf) }) + /** + * A map to hold the mapping from user-defined tags to the real tags attached to Jobs. + * Real tag have the current session ID attached: `"tag1" -> s"spark-$sessionUUID-tag1"`. + */ + private val userDefinedToRealTagsMap: ConcurrentHashMap[String, String] = new ConcurrentHashMap() Review Comment: Shouldn't we inherit this when somebody clone a session from a parent session? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729788480 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +802,114 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag") + } + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = { +SparkContext.throwIfInvalidTag(tag) +userDefinedToRealTagsMap.remove(tag) + } + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = userDefinedToRealTagsMap.clear() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { +val cancelledIds = sparkContext.cancelAllJobs(_.getSparkSessionUUID.contains(sessionUUID)) +ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq + } + + /** + * Interrupt all operations of this session with the given operation tag. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptTag(tag: String): Seq[String] = { Review Comment: Interrupt API is non-blocking IIRC, also it should work with streaming queries too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729787617 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { Review Comment: I would prefer to use the same approach with `JobArtifactSet`. We shouldn't add the concept of SQL/SparkSession here. You can hold a default key here, and set the state within `SparkSession`, see also https://github.com/apache/spark/commit/57bbb4c6e7309e0beaeb3618978dcaf75ce9a7fe#diff-1e596e9e9bd50e3075847a66f8fecee741289b2b0478f9099c3bf2b680ebaa35R1741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729787617 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { Review Comment: I would prefer to use the same approach with `JobArtifactSet`. We shouldn't add the concept of SQL/SparkSession here. You can hold a default key here, and set the state within `SparkSession`, see also https://github.com/apache/spark/commit/57bbb4c6e7309e0beaeb3618978dcaf75ce9a7fe#diff-1e596e9e9bd50e3075847a66f8fecee741289b2b0478f9099c3bf2b680ebaa35R1741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729786760 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -825,6 +826,11 @@ class SparkContext(config: SparkConf) extends Logging { def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull + /** Set the UUID of the Spark session that starts the current job. */ + def setSparkSessionUUID(uuid: String): Unit = { +setLocalProperty(SparkContext.SPARK_SESSION_UUID, uuid) Review Comment: SparkContext is only one. How does this relate to Spark Session UUID? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1729786688 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2708,13 +2741,51 @@ class SparkContext(config: SparkConf) extends Logging { def cancelJobsWithTag(tag: String): Unit = { SparkContext.throwIfInvalidTag(tag) assertNotStopped() -dagScheduler.cancelJobsWithTag(tag, None) +dagScheduler.cancelJobsWithTag( + tag, + reason = None, + shouldCancelJob = None, + cancelledJobs = None) + } + + /** + * Cancel all jobs that have been scheduled or are running. + * + * @param shouldCancelJob Callback function to be called with the job ID of each job that matches + *the given tag. If the function returns true, the job will be cancelled. + * @return A future that will be completed with the set of job IDs that were cancelled. + */ + def cancelAllJobs(shouldCancelJob: ActiveJob => Boolean): Future[Set[Int]] = { Review Comment: This is a new API, it's not adding the API back from Connect to core. Let's separate it from this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2307171864 @HyukjinKwon @hvanhovell This PR is now ready for review. Could you take a look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1725191021 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) Review Comment: Done. We now store all user-provided tags in the session, and on the backend, we actually attach `spark-$sessionUUID-$tag` to jobs started by this session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1725187692 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = sparkContext.removeJobTag(tag) + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = sparkContext.getJobTags() + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = sparkContext.clearJobTags() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { Review Comment: Thanks for the comment! I implemented this with a tag that contains the session ID. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
xupefei commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2302228817 > Could we file a JIRA for Python API set too? Just to make sure we don't miss it out Done! https://issues.apache.org/jira/browse/SPARK-49337 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
HyukjinKwon commented on PR #47815: URL: https://github.com/apache/spark/pull/47815#issuecomment-2300700363 Could we file a JIRA for Python API set too? Just to make sure we don't miss it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1723425528 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) Review Comment: I think we also need to make sure the tag is unique to this session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1723421040 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = sparkContext.removeJobTag(tag) + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = sparkContext.getJobTags() + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = sparkContext.clearJobTags() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { Review Comment: We probably should add one tag that is exclusively tied to the session and use that to cancel. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1723419637 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) Review Comment: We may have to take a small step back here. I think we have to track the tags in the session it self, and only set them when we execute an action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1723411973 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) Review Comment: This seems to make the strong assumption that a session is only used in a single thread. That does not need to hold. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49249][SPARK-49320] Add new tag-related APIs in Connect back to Spark Core [spark]
hvanhovell commented on code in PR #47815: URL: https://github.com/apache/spark/pull/47815#discussion_r1723408463 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -794,6 +795,108 @@ class SparkSession private( } } + + /** + * Add a tag to be assigned to all the operations started by this thread in this session. + * + * Often, a unit of execution in an application consists of multiple Spark executions. + * Application programmers can use this method to group all those jobs together and give a group + * tag. The application can use `org.apache.spark.sql.SparkSession.interruptTag` to cancel all + * running executions with this tag. For example: + * {{{ + * // In the main thread: + * spark.addTag("myjobs") + * spark.range(10).map(i => { Thread.sleep(10); i }).collect() + * + * // In a separate thread: + * spark.interruptTag("myjobs") + * }}} + * + * There may be multiple tags present at the same time, so different parts of application may + * use different tags to perform cancellation at different levels of granularity. + * + * @param tag + * The tag to be added. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def addTag(tag: String): Unit = sparkContext.addJobTag(tag) + + /** + * Remove a tag previously added to be assigned to all the operations started by this thread in + * this session. Noop if such a tag was not added earlier. + * + * @param tag + * The tag to be removed. Cannot contain ',' (comma) character or be an empty string. + * + * @since 4.0.0 + */ + def removeTag(tag: String): Unit = sparkContext.removeJobTag(tag) + + /** + * Get the tags that are currently set to be assigned to all the operations started by this + * thread. + * + * @since 4.0.0 + */ + def getTags(): Set[String] = sparkContext.getJobTags() + + /** + * Clear the current thread's operation tags. + * + * @since 4.0.0 + */ + def clearTags(): Unit = sparkContext.clearJobTags() + + /** + * Interrupt all operations of this session that are currently running. + * + * @return + * sequence of Job IDs of interrupted operations. + * + * @since 4.0.0 + */ + def interruptAll(): Seq[String] = { Review Comment: This currently seems to cancel all running queries, not just the ones owned by this session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org