Github user sansagara commented on a diff in the pull request:
https://github.com/apache/spark/pull/12154#discussion_r177898973
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming.scheduler
+
+import scala.util.Random
+
+import org.apache.spark.{ExecutorAllocationClient, SparkConf}
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, Utils}
+
+/**
+ * Class that manages executor allocated to a StreamingContext, and
dynamically request or kill
+ * executors based on the statistics of the streaming computation. At a
high level, the policy is:
+ * - Use StreamingListener interface get batch processing times of
completed batches
+ * - Periodically take the average batch completion times and compare with
the batch interval
+ * - If (avg. proc. time / batch interval) >= scaling up ratio, then
request more executors
+ * - If (avg. proc. time / batch interval) <= scaling down ratio, then try
to kill a executor that
+ * is not running a receiver
+ */
+private[streaming] class ExecutorAllocationManager(
+ client: ExecutorAllocationClient,
+ receiverTracker: ReceiverTracker,
+ conf: SparkConf,
+ batchDurationMs: Long,
+ clock: Clock) extends StreamingListener with Logging {
+
+ import ExecutorAllocationManager._
+
+ private val scalingIntervalSecs = conf.getTimeAsSeconds(
+ SCALING_INTERVAL_KEY,
+ s"${SCALING_INTERVAL_DEFAULT_SECS}s")
+ private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY,
SCALING_UP_RATIO_DEFAULT)
+ private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY,
SCALING_DOWN_RATIO_DEFAULT)
+ private val minNumExecutors = conf.getInt(
+ MIN_EXECUTORS_KEY,
+ math.max(1, receiverTracker.numReceivers))
+ private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY,
Integer.MAX_VALUE)
+ private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
+ _ => manageAllocation(), "streaming-executor-allocation-manager")
+
+ @volatile private var batchProcTimeSum = 0L
+ @volatile private var batchProcTimeCount = 0
+
+ validateSettings()
+
+ def start(): Unit = {
+ timer.start()
+ logInfo(s"ExecutorAllocationManager started with " +
+ s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval =
$scalingIntervalSecs sec")
+ }
+
+ def stop(): Unit = {
+ timer.stop(interruptTimer = true)
+ logInfo("ExecutorAllocationManager stopped")
+ }
+
+ private def manageAllocation(): Unit = synchronized {
+ logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio,
$scalingDownRatio]")
+ if (batchProcTimeCount > 0) {
+ val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
+ val ratio = averageBatchProcTime.toDouble / batchDurationMs
+ logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
+ if (ratio >= scalingUpRatio) {
+ logDebug("Requesting executors")
+ val numNewExecutors = math.max(math.round(ratio).toInt, 1)
+ requestExecutors(numNewExecutors)
+ } else if (ratio <= scalingDownRatio) {
+ logDebug("Killing executors")
+ killExecutor()
+ }
+ }
+ batchProcTimeSum = 0
+ batchProcTimeCount = 0
+ }
+
+ private def requestExecutors(numNewExecutors: Int): Unit = {
+ require(numNewExecutors >= 1)
+ val allExecIds = client.getExecutorIds()
+ logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
+ val targetTotalExecutors =
+ math.max(math.min(maxNumExecutors, allExecIds.size +
numNewExecutors), minNumExecutors)
+ client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
+ logInfo(s"Requested total $targetTotalExecutors executors")
+ }
+
+ private def killExecutor(): Unit = {
+ val allExecIds = client.getExecutorIds()
+ logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
+
+ if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
+ val execIdsWithReceivers =
receiverTracker.getAllocatedExecutors.values.flatten.toSeq
+ logInfo(s"Executors with receivers (${execIdsWithReceivers.size}):
${execIdsWithReceivers}")
+
+ val removableExecIds = allExecIds.diff(execIdsWithReceivers)
+ logDebug(s"Removable executors (${removableExecIds.size}):
${removableExecIds}")
+ if (removableExecIds.nonEmpty) {
+ val execIdToRemove =
removableExecIds(Random.nextInt(removableExecIds.size))
+ client.killExecutor(execIdToRemove)
+ logInfo(s"Requested to kill executor $execIdToRemove")
+ } else {
+ logInfo(s"No non-receiver executors to kill")
+ }
+ } else {
+ logInfo("No available executor to kill")
+ }
+ }
+
+ private def addBatchProcTime(timeMs: Long): Unit = synchronized {
+ batchProcTimeSum += timeMs
+ batchProcTimeCount += 1
+ logDebug(
+ s"Added batch processing time $timeMs, sum = $batchProcTimeSum,
count = $batchProcTimeCount")
+ }
+
+ private def validateSettings(): Unit = {
+ require(
+ scalingIntervalSecs > 0,
+ s"Config $SCALING_INTERVAL_KEY must be more than 0")
+
+ require(
+ scalingUpRatio > 0,
+ s"Config $SCALING_UP_RATIO_KEY must be more than 0")
+
+ require(
+ scalingDownRatio > 0,
+ s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
+
+ require(
+ minNumExecutors > 0,
+ s"Config $MIN_EXECUTORS_KEY must be more than 0")
+
+ require(
+ maxNumExecutors > 0,
+ s"$MAX_EXECUTORS_KEY must be more than 0")
+
+ require(
+ scalingUpRatio > scalingDownRatio,
+ s"Config $SCALING_UP_RATIO_KEY must be more than config
$SCALING_DOWN_RATIO_KEY")
+
+ if (conf.contains(MIN_EXECUTORS_KEY) &&
conf.contains(MAX_EXECUTORS_KEY)) {
+ require(
+ maxNumExecutors >= minNumExecutors,
+ s"Config $MAX_EXECUTORS_KEY must be more than config
$MIN_EXECUTORS_KEY")
+ }
+ }
+
+ override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted): Unit = {
+ logDebug("onBatchCompleted called: " + batchCompleted)
+ if
(!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty))
{
+ batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
+ }
+ }
+}
+
+private[streaming] object ExecutorAllocationManager extends Logging {
+ val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
+
+ val SCALING_INTERVAL_KEY =
"spark.streaming.dynamicAllocation.scalingInterval"
+ val SCALING_INTERVAL_DEFAULT_SECS = 60
+
+ val SCALING_UP_RATIO_KEY =
"spark.streaming.dynamicAllocation.scalingUpRatio"
+ val SCALING_UP_RATIO_DEFAULT = 0.9
+
+ val SCALING_DOWN_RATIO_KEY =
"spark.streaming.dynamicAllocation.scalingDownRatio"
+ val SCALING_DOWN_RATIO_DEFAULT = 0.3
+
+ val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
+
+ val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
--- End diff --
@tdas @andrewor14 I also have to ask: Any reason `initExecutors ` is not
supported for streaming with dynamic allocation? I'm having issues with my
application because it needs a minimum executors count to start behaving good
with the Kinesis stream.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]