Github user attilapiros commented on a diff in the pull request:
https://github.com/apache/spark/pull/21068#discussion_r185791498
--- Diff:
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.util.Arrays
+import java.util.Collections
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import
org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
+import org.apache.spark.internal.config.{MAX_FAILED_EXEC_PER_NODE, _}
+import org.apache.spark.util.ManualClock
+
+class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with
Matchers
+ with BeforeAndAfterEach {
+
+ val BLACKLIST_TIMEOUT = 100L
+ val MAX_FAILED_EXEC_PER_NODE_VALUE = 2
+
+ var amClientMock: AMRMClient[ContainerRequest] = _
+ var yarnBlacklistTracker: YarnAllocatorBlacklistTracker = _
+ var failureTracker: FailureTracker = _
+ var clock: ManualClock = _
+
+ override def beforeEach(): Unit = {
+ val sparkConf = new SparkConf()
+ sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
+ sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
+ sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)
+ clock = new ManualClock()
+
+ amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
+ failureTracker = new FailureTracker(sparkConf, clock)
+ yarnBlacklistTracker =
+ new YarnAllocatorBlacklistTracker(sparkConf, amClientMock,
failureTracker)
+ yarnBlacklistTracker.setNumClusterNodes(4)
+ super.beforeEach()
+ }
+
+ test("expiring its own blacklisted nodes") {
+ (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+ _ => {
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
+ // host should not be blacklisted at these failures as
MAX_FAILED_EXEC_PER_NODE is 2
+ verify(amClientMock, never())
+ .updateBlacklist(Arrays.asList("host"), Collections.emptyList())
+ }
+ }
+
+ yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
+ // the third failure on the host triggers the blacklisting
+ verify(amClientMock).updateBlacklist(Arrays.asList("host"),
Collections.emptyList())
+
+ clock.advance(BLACKLIST_TIMEOUT)
+
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Map())
--- End diff --
It is needed to trigger synchronisation of blacklisted nodes with YARN. I
have added a comment above.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]