Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/5212#discussion_r29912482
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -495,6 +497,45 @@ class StreamingContext private[streaming] (
)
}
+ private def isCoresEnoughInLocalMode(): Boolean = {
+ val risNum = graph.getReceiverInputStreams().size
+ if (risNum == 0){
+ true
+ }
+
+ // Regular expression used for local[N] and local[*] master formats
+ val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+ // Regular expression for local[N, maxRetries], used in tests with
failing tasks
+ val LOCAL_N_FAILURES_REGEX =
"""local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+ // Regular expression for simulating a Spark cluster of [N, cores,
memory] locally
+ val LOCAL_CLUSTER_REGEX =
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
+
+ val master = sc.conf.get("spark.master")
+ val coresPerTask = sc.conf.getInt("spark.task.cpus", 1)
+ val maxTaskNum = master match {
+ case "local" => 1 / coresPerTask
+
+ case LOCAL_N_REGEX(threads) =>
--- End diff --
This effectively replicates the logic in SparkContext, DAGScheduler, etc.
This is not good. Things can silently break if any of the internal logic of
Spark change. A better solution might be to hook into the Spark's internal
DAGScheduler, etc to see the number of cores it chooses.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]