kbendick commented on a change in pull request #33508:
URL: https://github.com/apache/spark/pull/33508#discussion_r694351939



##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -323,6 +323,16 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_ALLOCATION_PODSALLOCATOR =
+    ConfigBuilder("spark.kubernetes.allocation.podsallocator")

Review comment:
       Nit: Possibly this config would be better named as 
`spark.kubernetes.allocation.pods.allocator`? Seems that `Allocator` is used as 
its own word and is capitalized in class names.
   
   I'd also update to `KUBERNETES_ALLOCATION_PODS_ALLOCATOR` if you make that 
change.
   
   

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
##########
@@ -242,6 +246,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 
1))
   }
 
+  test("Verify stopping deletes the labled pods") {

Review comment:
       Nit: typo in the word `labeled`.

##########
File path: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
##########
@@ -25,11 +29,24 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
 
   import BasicTestsSuite._
   import KubernetesSuite.k8sTestTag
+  import KubernetesSuite.{TIMEOUT, INTERVAL}
 
   test("Run SparkPi with no resources", k8sTestTag) {
     runSparkPiAndVerifyCompletion()
   }
 
+  test("Run SparkPi with no resources & statefulset allocation", k8sTestTag) {
+    sparkAppConf.set("spark.kubernetes.allocation.podsallocator", 
"statefulset")

Review comment:
       If you update the config name, don't forget to update here too =)

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
##########
@@ -138,6 +133,31 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       podsPollingEventSource)
   }
 
+  private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, 
kubernetesClient: KubernetesClient,
+      snapshotsStore: ExecutorPodsSnapshotsStore) = {
+    val executorPodsAllocatorName = 
sc.conf.get(KUBERNETES_ALLOCATION_PODSALLOCATOR) match {
+      case "statefulset" =>
+        "org.apache.spark.scheduler.cluster.k8s.StatefulsetPodsAllocator"
+      case "direct" =>
+        "org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator"

Review comment:
       Nit: Possibly consider using some named constants or `classOf` with 
`.toString`? Can be left as a follow up too. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to