[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-11 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74481008
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
--- End diff --

It seems that the Hive Metastore can't handle a RPC with millions of 
partitions, I will send a patch to do it in batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74132414
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
--- End diff --

no, this is true for Hive <=0.12, for Hive 0.13+, they are sent in single 
RPC. so we should verify that what's limit for a single RPC


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74132132
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
--- End diff --

Good question, see the implementation in HiveShim:
```
  // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
  override def createPartitions(
  hive: Hive,
  database: String,
  tableName: String,
  parts: Seq[CatalogTablePartition],
  ignoreIfExists: Boolean): Unit = {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
  val location = s.storage.locationUri.map(new Path(table.getPath, 
_)).orNull
  val spec = s.spec.asJava
  if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
// Ignore this partition since it already exists and ignoreIfExists 
== true
  } else {
if (location == null && table.isView()) {
  throw new HiveException("LOCATION clause illegal for view 
partition");
}

createPartitionMethod.invoke(
  hive,
  table,
  spec,
  location,
  null, // partParams
  null, // inputFormat
  null, // outputFormat
  -1: JInteger, // numBuckets
  null, // cols
  null, // serializationLib
  null, // serdeParams
  null, // bucketCols
  null) // sortCols
  }
}
  }
```
All these partitions will be insert into Hive in sequential way, so group 
them as batches will not help here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsub

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74123952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
--- End diff --

What will happen if we get thousands of new partitions of tens thousands of 
new partitions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74110917
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+// TODO: Validate the value
+val value = PartitioningUtils.unescapePathName(ps(1))
--- End diff --

yes, that makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74100170
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
--- End diff --

I did not figure out how it work, at least `statuses.par(evalTaskSupport)` 
does not work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74099592
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+// TODO: Validate the value
+val value = PartitioningUtils.unescapePathName(ps(1))
--- End diff --

If the partitions are generated by Spark, they could be unescape back 
correctly.  For others, they could be compatibility issues. For example, Spark 
does not escape ` ` in Linux, the unescaping for `%20` could be wrong (we could 
show an warning?). I think these are not in the scope of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@a

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74094542
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
--- End diff --

This is copied from UnionRDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r74094235
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+// TODO: Validate the value
+val value = PartitioningUtils.unescapePathName(ps(1))
+// comparing with case-insensitive, but preserve the case
+if (columnName == partitionNames(0)) {
+  scanPartitions(
+spark, fs, filter, st.getPath, spec ++ Map(columnName -> 
value), partitionNames.drop(1))
+} else {
+  logWarning(s"expect partition column ${partitionNames(0)}, but 
got ${ps(0)}, ignore it")
+  Seq()
--- End diff --

The Hive only throws exception when there are not allowed character in the 
value, not other cases. I'd like to avoid any configs if no serious problem 
here.


---

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-08 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73975249
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+// TODO: Validate the value
+val value = PartitioningUtils.unescapePathName(ps(1))
+// comparing with case-insensitive, but preserve the case
+if (columnName == partitionNames(0)) {
+  scanPartitions(
+spark, fs, filter, st.getPath, spec ++ Map(columnName -> 
value), partitionNames.drop(1))
+} else {
+  logWarning(s"expect partition column ${partitionNames(0)}, but 
got ${ps(0)}, ignore it")
+  Seq()
--- End diff --

Like hive, we may consider throwing an exception here (that could be turned 
off via a config).


---
If your project is set up for it, you can reply to this

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-08 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73971757
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+// TODO: Validate the value
+val value = PartitioningUtils.unescapePathName(ps(1))
--- End diff --

Can this escaping cause problems in (say) S3 for objects of the form 
"foo%20bar"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-m

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-08 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73962887
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
--- End diff --

cool. can we make it explicit, e.g. `statuses.par(evalTaskSupport)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73945744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+val value = PartitioningUtils.unescapePathName(ps(1))
--- End diff --

We could have a TODO here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73945656
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+val value = PartitioningUtils.unescapePathName(ps(1))
+// comparing with case-insensitive, but preserve the case
+if (columnName == partitionNames(0)) {
--- End diff --

I think it's valid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional c

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73800495
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
--- End diff --

A new one is created here: 
https://github.com/apache/spark/pull/14500/files#diff-54979ed5797b4a6193cf663dc23baca5R490


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73780390
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+val value = PartitioningUtils.unescapePathName(ps(1))
+// comparing with case-insensitive, but preserve the case
+if (columnName == partitionNames(0)) {
--- End diff --

A directory name like "a=" will pass this condition and get empty partition 
value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscrib

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73780357
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
+  val name = st.getPath.getName
+  if (st.isDirectory && name.contains("=")) {
+val ps = name.split("=", 2)
+val columnName = 
PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+val value = PartitioningUtils.unescapePathName(ps(1))
--- End diff --

Do we need to check if the value is valid. E.g., for a partition column "a" 
of IntegerType, "a=abc" is invalid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@

[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73780281
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
 testAddPartitions(isDatasourceTable = true)
   }
 
+  test("alter table: recover partitions (sequential)") {
+withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+  testRecoverPartitions()
+}
+  }
+
+  test("after table: recover partition (parallel)") {
--- End diff --

after -> alter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73775993
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+/**
+ * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
+ * update the catalog.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table RECOVER PARTITIONS;
+ *   MSCK REPAIR TABLE table;
+ * }}}
+ */
+case class AlterTableRecoverPartitionsCommand(
+tableName: TableIdentifier,
+cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends 
RunnableCommand {
+  override def run(spark: SparkSession): Seq[Row] = {
+val catalog = spark.sessionState.catalog
+if (!catalog.tableExists(tableName)) {
+  throw new AnalysisException(s"Table $tableName in $cmd does not 
exist.")
+}
+val table = catalog.getTableMetadata(tableName)
+if (catalog.isTemporaryTable(tableName)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on temporary tables: $tableName")
+}
+if (DDLUtils.isDatasourceTable(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd on datasource tables: $tableName")
+}
+if (table.tableType != CatalogTableType.EXTERNAL) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on external tables: 
$tableName")
+}
+if (!DDLUtils.isTablePartitioned(table)) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
+}
+if (table.storage.locationUri.isEmpty) {
+  throw new AnalysisException(
+s"Operation not allowed: $cmd only works on table with location 
provided: $tableName")
+}
+
+val root = new Path(table.storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+// Dummy jobconf to get to the pathFilter defined in configuration
+// It's very expensive to create a 
JobConf(ClassUtil.findContainingJar() is slow)
+val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
+val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+val partitionSpecsAndLocs = scanPartitions(
+  spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
+val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+  // inherit table storage format (possibly except for location)
+  CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+}
+spark.sessionState.catalog.createPartitions(tableName,
+  parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+Seq.empty[Row]
+  }
+
+  @transient private lazy val evalTaskSupport = new 
ForkJoinTaskSupport(new ForkJoinPool(8))
+
+  private def scanPartitions(
+  spark: SparkSession,
+  fs: FileSystem,
+  filter: PathFilter,
+  path: Path,
+  spec: TablePartitionSpec,
+  partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
+if (partitionNames.length == 0) {
+  return Seq(spec -> path)
+}
+
+val statuses = fs.listStatus(path)
+val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+val parArray = statuses.par
--- End diff --

i didn't look carefully - but if you are using the default exec context, 
please create a new one. otherwise it'd block.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

2016-08-05 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/14500#discussion_r73753834
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -827,6 +827,45 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
 testAddPartitions(isDatasourceTable = true)
   }
 
+  test("alter table: recover partitions (sequential)") {
+withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+  testRecoverPartitions()
+}
+  }
+
+  test("after table: recover partition (parallel)") {
+withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+  testRecoverPartitions()
+}
+  }
+
+  private def testRecoverPartitions() {
+val catalog = spark.sessionState.catalog
+// table to alter does not exist
+intercept[AnalysisException] {
+  sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
+}
+
+val tableIdent = TableIdentifier("tab1")
+createTable(catalog, tableIdent)
+val part1 = Map("a" -> "1", "b" -> "5")
+createTablePartition(catalog, part1, tableIdent)
+assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == 
Set(part1))
+
+val part2 = Map("a" -> "2", "b" -> "6")
+val root = new 
Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+fs.mkdirs(new Path(new Path(root, "a=2"), "b=6"))
+try {
+  sql("ALTER TABLE tab1 RECOVER PARTITIONS")
+  assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+Set(part1, part2))
+} finally {
+  fs.delete(root, true)
+}
+  }
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org