sunchao commented on a change in pull request #25840:
URL: https://github.com/apache/spark/pull/25840#discussion_r490561528
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -147,26 +147,46 @@ object FileCommitProtocol extends Logging {
className: String,
jobId: String,
outputPath: String,
- dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
+ dynamicPartitionOverwrite: Boolean = false,
+ restrictions: Map[String, _] = Map.empty): FileCommitProtocol = {
logDebug(s"Creating committer $className; job $jobId; output=$outputPath;"
+
s" dynamic=$dynamicPartitionOverwrite")
val clazz = Utils.classForName[FileCommitProtocol](className)
// First try the constructor with arguments (jobId: String, outputPath:
String,
+ // dynamicPartitionOverwrite: Boolean,
+ // restrictions: Map[String, _]).
+ // If that doesn't exist, try the one with (jobId: String, outputPath:
String,
// dynamicPartitionOverwrite: Boolean).
- // If that doesn't exist, try the one with (jobId: string, outputPath:
String).
+ // If that still doesn't exist, try the one with (jobId: string,
outputPath: String).
try {
- val ctor = clazz.getDeclaredConstructor(classOf[String],
classOf[String], classOf[Boolean])
- logDebug("Using (String, String, Boolean) constructor")
- ctor.newInstance(jobId, outputPath,
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
+ val ctor = clazz.getDeclaredConstructor(
Review comment:
Instead of this nested structure for trying different ctors, maybe we
can use the following?
```scala
try {
val ctor = clazz.getDeclaredConstructor(..) // try ctor #1
ctor.newInstance(...)
} catch {
case _: NoSuchMethodException =>
// handle the case
}
try {
val ctor = clazz.getDeclaredConstructor(..) // try ctor #2
ctor.newInstance(...)
} catch {
case _: NoSuchMethodException =>
// handle the case
}
...
// no suitable ctor, throw exception
throw new RuntimeException("No constructor found!")
```
##########
File path:
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
##########
@@ -147,26 +147,46 @@ object FileCommitProtocol extends Logging {
className: String,
jobId: String,
outputPath: String,
- dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
+ dynamicPartitionOverwrite: Boolean = false,
+ restrictions: Map[String, _] = Map.empty): FileCommitProtocol = {
Review comment:
Perhaps we'll need to update the top-level comments for this class as
well? to include the additional ctor
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
##########
@@ -78,10 +80,43 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand {
.foreach { case (compression, codec) => hadoopConf.set(compression,
codec) }
}
- val committer = FileCommitProtocol.instantiate(
- sparkSession.sessionState.conf.fileCommitProtocolClass,
- jobId = java.util.UUID.randomUUID().toString,
- outputPath = outputLocation)
+ val committer = if (numDynamicPartitions > 0) {
+ val perTaskRestriction =
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK.key -> (
+ Option(hadoopConf.get("hive.exec.max.dynamic.partitions.pernode"))
match {
+ case Some(num) => num.toInt
+ case None =>
sparkSession.sessionState.conf.maxDynamicPartitionsPerTask
+ })
+
+ val totalRestriction = SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS.key -> (
+ Option(hadoopConf.get("hive.exec.max.dynamic.partitions")) match {
+ case Some(num) => num.toInt
+ case None => sparkSession.sessionState.conf.maxDynamicPartitions
+ })
+
+ val fileRestriction = SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key ->
(
+ Option(hadoopConf.get("hive.exec.max.created.files")) match {
+ case Some(num) => num.toInt
+ case None =>
sparkSession.sessionState.conf.maxCreatedFilesInDynamicPartition
+ })
+
+ val dynamicPartitionRestrictions: Map[String, _] = Map(
+ totalRestriction,
+ perTaskRestriction,
+ fileRestriction
+ )
+
+ FileCommitProtocol.instantiate(
+ sparkSession.sessionState.conf.fileCommitProtocolClass,
+ jobId = java.util.UUID.randomUUID().toString,
+ outputPath = outputLocation,
+ dynamicPartitionOverwrite = true,
Review comment:
previously this flag is set to false, why we switch it to true here?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2016,6 +2016,30 @@ object SQLConf {
.checkValues(PartitionOverwriteMode.values.map(_.toString))
.createWithDefault(PartitionOverwriteMode.STATIC.toString)
+ val DYNAMIC_PARTITION_MAX_PARTITIONS =
+ buildConf("spark.sql.dynamic.partition.maxPartitions")
+ .doc("Maximum total number of dynamic partitions allowed to be created
by one DML. " +
+ s"This only takes effect when ${FILE_COMMIT_PROTOCOL_CLASS.key} set to
" +
+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ .intConf
+ .createWithDefault(Int.MaxValue)
+
+ val DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK =
+ buildConf("spark.sql.dynamic.partition.maxPartitionsPerTask")
+ .doc("Maximum number of dynamic partitions allowed to be created per
task. " +
+ s"This only takes effect when ${FILE_COMMIT_PROTOCOL_CLASS.key} set to
" +
+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ .intConf
+ .createWithDefault(Int.MaxValue)
+
+ val DYNAMIC_PARTITION_MAX_CREATED_FILES =
+ buildConf("spark.sql.dynamic.partition.maxCreatedFiles")
+ .doc("Maximum total number of files allowed to be created in dynamic
partitions write " +
+ "by one DML. This only takes effect when
${FILE_COMMIT_PROTOCOL_CLASS.key} set to " +
+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ .intConf
+ .createWithDefault(Int.MaxValue)
Review comment:
Consider making this `Long`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2016,6 +2016,30 @@ object SQLConf {
.checkValues(PartitionOverwriteMode.values.map(_.toString))
.createWithDefault(PartitionOverwriteMode.STATIC.toString)
+ val DYNAMIC_PARTITION_MAX_PARTITIONS =
+ buildConf("spark.sql.dynamic.partition.maxPartitions")
+ .doc("Maximum total number of dynamic partitions allowed to be created
by one DML. " +
+ s"This only takes effect when ${FILE_COMMIT_PROTOCOL_CLASS.key} set to
" +
+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
+ .intConf
Review comment:
missing version info?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]