[GitHub] spark issue #15235: [SPARK-17661][SQL] Consolidate various listLeafFiles imp...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15235 @brkyvz does this look good? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15235: [SPARK-17661][SQL] Consolidate various listLeafFiles imp...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15235 I pushed a new version that should address all the outstanding issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80594396 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +85,185 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { + + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play --- End diff -- Moved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80590071 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +85,185 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { + + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + val status: Option[FileStatus] = try Option(fs.getFileStatus(path)) catch { --- End diff -- Let me take a look at this. This is actually consistent with the old code (for the parallel version). It is actually slightly tricky to remove this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80588749 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.SparkFunSuite + +class ListingFileCatalogSuite extends SparkFunSuite { + + test("file filtering") { --- End diff -- That's counterintuitive. The code is defined in ListingFileCatalog, and should be tested in ListingFileCatalogSuite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80588596 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +85,185 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { + + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + val status: Option[FileStatus] = try Option(fs.getFileStatus(path)) catch { +case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + None + } -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80588669 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +85,185 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { + + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + val status: Option[FileStatus] = try Option(fs.getFileStatus(path)) catch { +case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + None + } -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80588655 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +85,185 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { + + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + val status: Option[FileStatus] = try Option(fs.getFileStatus(path)) catch { +case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + None + } -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of
[GitHub] spark issue #15235: [SPARK-17661][SQL] Consolidate various listLeafFiles imp...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15235 @brkyvz I think this also impacts the change you just did in https://github.com/apache/spark/pull/15153. This change makes both code path consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15153: [SPARK-17599] Prevent ListingFileCatalog from failing if...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15153 @brkyvz the change here only affects the serial version, and not the parallel version, does it? Wouldn't that be a problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14696: [SPARK-16714][SQL] Refactor type widening for consistenc...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14696 Closing this for now until I revisit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15123: [SPARK-17551][SQL] Add DataFrame API for null ordering
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15123 cc @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14696: [SPARK-16714][SQL] Refactor type widening for con...
Github user petermaxlee closed the pull request at: https://github.com/apache/spark/pull/14696 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80375090 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.SparkFunSuite + +class ListingFileCatalogSuite extends SparkFunSuite { + + test("file filtering") { --- End diff -- this is moved from HadoopFsRelationSuite without any change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80375080 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +83,177 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => -if (f.isDirectory ) { - // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959). - f -} else { - HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(ha
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80375086 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +83,177 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => -if (f.isDirectory ) { - // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959). - f -} else { - HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(ha
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80375072 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +83,177 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => -if (f.isDirectory ) { - // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959). - f -} else { - HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { +// Dummy jobconf to get to the pathFilter defined in configuration +val jobConf = new JobConf(hadoopConf, this.getClass) +val filter = FileInputFormat.getInputPathFilter(jobConf) + +paths.flatMap { path => + logTrace(s"Listing $path") + val fs = path.getFileSystem(ha
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15235#discussion_r80375056 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -82,73 +83,177 @@ class ListingFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { -if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) -} else { - // Right now, the number of paths is less than the value of - // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. - // If there is any child that has more files than the threshold, we will use parallel - // listing. - - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses: Seq[FileStatus] = paths.flatMap { path => -val fs = path.getFileSystem(hadoopConf) -logTrace(s"Listing $path on driver") - -val childStatuses = { - try { -val stats = fs.listStatus(path) -if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } catch { -case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } -} +val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { +ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { +ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + +mutable.LinkedHashSet(files: _*) + } + + override def equals(other: Any): Boolean = other match { +case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet +case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} + + +object ListingFileCatalog extends Logging { -childStatuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => -if (f.isDirectory ) { - // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959). - f -} else { - HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize + // it from executor side and reconstruct it on driver side. + private case class SerializableBlockLocation( --- End diff -- I renamed this from "Fake" to "Serializable" to more accurately describe its purpose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15235: [SPARK-17661][SQL] Consolidate various listLeafFiles imp...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15235 @yhuai I think you wrote most of this. Can you take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15235: [SPARK-17661][SQL] Consolidate various listLeafFi...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/15235 [SPARK-17661][SQL] Consolidate various listLeafFiles implementations ## What changes were proposed in this pull request? There are 4 listLeafFiles-related functions in Spark: - ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented) - HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel) - HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles) It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. This code can be improved by: - Move all file listing code into ListingFileCatalog, since it is the only class that needs this. - Keep only one function for listing files in serial. ## How was this patch tested? This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17661 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15235.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15235 commit 2a76ec1da54fd19f5c8eb621ee5c823f69efe855 Author: petermaxlee Date: 2016-09-25T06:29:07Z [SPARK-17661][SQL] Consolidate various listLeafFiles implementations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15166#discussion_r79730198 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run 3 batches, and then assert that only 1 metadata file is left at the end +// since the first 2 should have been purged. +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => +val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) +val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) +val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 --- End diff -- I think @frreiss added this to be more obvious. I don't really have a preference here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15166: [SPARK-17513][SQL] Make StreamExecution garbage-c...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15166#discussion_r79730104 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run 3 batches, and then assert that only 1 metadata file is left at the end +// since the first 2 should have been purged. +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => +val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) +val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) +val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 +assert(toTest.size == 1 && toTest.head == "2") +true --- End diff -- It still fails. There was an assert there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15166: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/15166 [SPARK-17513] [STREAMING] [SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is a resubmission of 15126, which was based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17513-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15166.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15166 commit 89d5deec63adbfdb7360c348deeb855ad4db89c2 Author: petermaxlee Date: 2016-09-20T05:19:51Z [SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. Author: petermaxlee Author: frreiss Closes #15126 from petermaxlee/SPARK-17513. commit 5e6113c941c37701c4e300d4ce3bb7821384529c Author: petermaxlee Date: 2016-09-20T18:41:45Z Fix test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15135: [pyspark][group]pyspark GroupedData can't apply agg func...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15135 Isn't it as simple as ``` cols = [x for x in df.columns if x != "key] df.groupby("key").agg([F.min(x) for x in cols] + [F.max(x) for x in cols]) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15122: [SPARK-17569] Make StructuredStreaming FileStreamSource ...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15122 I looked into this. I think there are two ways that you can intercept any calls to HDFS. The first way is slightly hacky but pretty simple. FileSystem.addFileSystemForTesting is a package private method that can be used to inject a mock file system. You can create an implementation of FilterFileSystem and pass it in as "file" schema. Then all accesses to local file system will go through your implementation. Of course, you can also use a mocking library to do that, but that is not as clean since FilterFileSystem is a public interface. The second way is more robust and does not depend on any private APIs. Create an implementation of FilterFileSystem by pointing to LocalFileSystem, e.g. call it MockFileSystem. MockFileSystem.getScheme should return "mockfs://". You can then use this as the path when passing to structured streaming. This is probably a more robust, generic solution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15122: [SPARK-17569] Make StructuredStreaming FileStreamSource ...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15122 Can you test this by deleting the file on purpose, and see what kind of exceptions are thrown? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15127: [SPARK-17571][SQL] AssertOnQuery.condition should always...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15127 cc @tdas @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15127: [SPARK-17571][SQL] AssertOnQuery.condition should...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/15127 [SPARK-17571][SQL] AssertOnQuery.condition should always return Boolean value ## What changes were proposed in this pull request? AssertOnQuery has two apply constructor: one that accepts a closure that returns boolean, and another that accepts a closure that returns Unit. This is actually very confusing because developers could mistakenly think that AssertOnQuery always require a boolean return type and verifies the return result, when indeed the value of the last statement is ignored in one of the constructors. This pull request makes the two constructor consistent and always require boolean value. It will overall make the test suites more robust against developer errors. ## How was this patch tested? This is a test only change. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17571 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15127.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15127 commit 008eb07a6e8e7a731c2b12b1d490c04dd1dfe3c0 Author: petermaxlee Date: 2016-09-17T07:35:46Z [SPARK-17571][SQL] AssertOnQuery.condition should always return Boolean value --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15126: [SPARK-17513][SQL] Make StreamExecution garbage-c...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/15126 [SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata ## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17513 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15126.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15126 commit 8cc5e835b209d5796b044978ec4221ee22a8b9d2 Author: frreiss Date: 2016-09-08T19:59:15Z Added purge() call to scheduler commit d71366d958334ebbc81e45c7f469bad2a68d0a2d Author: frreiss Date: 2016-09-10T04:23:58Z Added test case and corrected off-by-one error. commit 82f5b681c2e8e52f8549b21c7d058c497f2fc809 Author: frreiss Date: 2016-09-10T04:24:35Z Merge branch 'master' of https://github.com/apache/spark into fred-16963a commit 6bd85faaf6cb5382f7c2a9d5a1b571281b8f6e62 Author: petermaxlee Date: 2016-09-17T06:49:37Z Merge pull request #15067 from frreiss/fred-16963a [SPARK-17513] [STREAMING] [SQL] Make StreamExecution garbage-collect its metadata commit 6b64d5cd222c70071faa7aebd8db191d5e1c0185 Author: petermaxlee Date: 2016-09-17T07:16:28Z Fix test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15067: [SPARK-17513] [STREAMING] [SQL] Make StreamExecut...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15067#discussion_r79278249 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala --- @@ -125,6 +125,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbarge collection") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS().map(6 / _) + +// Run a few batches through the application +testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + // Three batches have run, but only one set of metadata should be present + AssertOnQuery( +q => { + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 + toTest.size == 1 && toTest.head == "2" --- End diff -- actually this test case will always pass, even without the change here. I will submit a patch based on yours that fixes the issues here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15123: [SPARK-17551][SQL] Add DataFrame API for null ord...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/15123#discussion_r79272091 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -325,6 +325,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(6)) } + test("sorting with null ordering") { +val data = Seq[java.lang.Integer](2, 1, null).toDF("key") --- End diff -- This is a much simpler test case that the one added in #15107. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15123: [SPARK-17551][SQL] Add DataFrame API for null ordering
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/15123 This is based on https://github.com/apache/spark/pull/15107 but cleaned up a little bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15123: [SPARK-17551][SQL] Add DataFrame API for null ord...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/15123 [SPARK-17551][SQL] Add DataFrame API for null ordering ## What changes were proposed in this pull request? This pull request adds Scala/Java DataFrame API for null ordering (NULLS FIRST | LAST). ## How was this patch tested? Added a new test case in DataFrameSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17551 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15123.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15123 commit 69a9d8aacb3832f3e346a7b9dbb13dc522e4dc68 Author: Xin Wu Date: 2016-09-15T04:12:43Z complete the NULL ordering support in DataFrame APIs commit 90af6b87d6b677cd9b957c2faa18520e005d6001 Author: petermaxlee Date: 2016-09-17T00:26:06Z Merge pull request #15107 from xwu0226/null_order [SPARK-17551][SQL] complete the NULL ordering support in DataFrame APIs commit 02f5edf207e2d545a23f7c33d4f19ce8fc880041 Author: petermaxlee Date: 2016-09-17T00:58:11Z Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14802: [SPARK-17235][SQL] Support purging of old logs in Metada...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14802 I can but I'm doing a lot of work in this area and it is a lot more difficult since they have dependencies. It would be better to merge the logically atomic pull requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14802: [SPARK-17235][SQL] Support purging of old logs in Metada...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14802 @zsxwing yup I plan to consolidate them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14802: [SPARK-17235][SQL] Support purging of old logs in...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14802#discussion_r76191571 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala --- @@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - - def testManager(basePath: Path, fm: FileManager): Unit = { + /** Basic test case for [[FileManager]] implementation. */ + private def testFileManager(basePath: Path, fm: FileManager): Unit = { --- End diff -- I renamed this because initially I thought it's a noun meaning "manager for testing", rather than "to test the file manager". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14802: [SPARK-17235][SQL] Support purging of old logs in...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14802 [SPARK-17235][SQL] Support purging of old logs in MetadataLog ## What changes were proposed in this pull request? This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time. ## How was this patch tested? Added a unit test case in HDFSMetadataLogSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17235 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14802.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14802 commit 0d9d1e6d59fb68996bf96b5238835a0718a8da1a Author: petermaxlee Date: 2016-08-25T07:11:47Z [SPARK-17235][SQL] Support purging of old logs in MetadataLog --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14802: [SPARK-17235][SQL] Support purging of old logs in Metada...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14802 @tdas and @zsxwing can you take a look at this? It's a pretty simple change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r76108686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -41,36 +40,59 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { + import FileStreamSource._ + + private val sourceOptions = new FileStreamOptions(options) + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns - private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) + + private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) + --- End diff -- metadataLog is not logically related to qualifiedBasePath --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14722: [SPARK-13286] [SQL] add the next expression of SQLExcept...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14722 Thanks for the update. Can you explain more the comment about "especially in batch mode"? Were you referencing batch vs streaming? What makes this more difficult in batch but less difficult in streaming? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14728: [SPARK-17165][SQL] FileStreamSource should not track the...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14728 Re: format compatibility My understanding is that structured streaming is not yet production ready and the compatibility guarantee across versions doesn't really apply. Otherwise, we shouldn't use Java serialization here at all and should opt for explicit JSON or protobuf. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r76096158 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -41,36 +40,59 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { + import FileStreamSource._ + + private val sourceOptions = new FileStreamOptions(options) + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns - private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) + + private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) + private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ - private val maxFilesPerBatch = getMaxFilesPerBatch() + private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger + + /** A mapping from a file that we have processed to some timestamp it was last modified. */ + // Visible for testing. + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) --- End diff -- This is actually useful in general for debugging. I'm going to update the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r76095989 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -41,36 +40,59 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { + import FileStreamSource._ + + private val sourceOptions = new FileStreamOptions(options) + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns - private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) + + private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) + private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ - private val maxFilesPerBatch = getMaxFilesPerBatch() + private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger + + /** A mapping from a file that we have processed to some timestamp it was last modified. */ + // Visible for testing. + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) - private val seenFiles = new OpenHashSet[String] - metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => -files.foreach(seenFiles.add) + metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) => +entry.foreach(seenFiles.add) +seenFiles.purge() --- End diff -- good idea. let me add it to the option class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r76095443 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -41,36 +40,59 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { + import FileStreamSource._ + + private val sourceOptions = new FileStreamOptions(options) + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns - private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) + + private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) + --- End diff -- There is only one blank line. It's actually a good idea to have blank lines separating variable definitions, as documented in a lot of different coding style guides. Excessive blank lines are bad though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r76095002 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -41,36 +40,59 @@ class FileStreamSource( metadataPath: String, options: Map[String, String]) extends Source with Logging { + import FileStreamSource._ + + private val sourceOptions = new FileStreamOptions(options) + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns - private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) + + private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) + private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ - private val maxFilesPerBatch = getMaxFilesPerBatch() + private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger + + /** A mapping from a file that we have processed to some timestamp it was last modified. */ + // Visible for testing. + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) - private val seenFiles = new OpenHashSet[String] - metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => -files.foreach(seenFiles.add) + metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) => +entry.foreach(seenFiles.add) +seenFiles.purge() --- End diff -- This is not safe until we truncate the logs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14722: [SPARK-13286] [SQL] add the next expression of SQLExcept...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14722 @davies can you explain more? I looked into this and not 100% sure what you were referring to with your description. Is the original exception currently logged? I also think we might need to update the pull request to actually traverse all the possible chained exceptions and log them, just in case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14728: [SPARK-17165][SQL] FileStreamSource should not track the...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14728 I've updated the default and set it to 1 week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r75795996 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -17,21 +17,18 @@ package org.apache.spark.sql.execution.streaming -import scala.util.Try +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation} import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.OpenHashSet /** - * A very simple source that reads text files from the given directory as they appear. - * - * TODO Clean up the metadata files periodically --- End diff -- Put it back. Also updated the file with a new test case to make the seen map more robust. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14732: [SPARK-16320] [DOC] Document G1 heap region's effect on ...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14732 Looks good! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14731#discussion_r75583457 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -293,8 +290,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( } /** Get file mod time from cache or fetch it from the file system */ - private def getFileModTime(path: Path) = { -fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) + private def getFileModTime(fs: FileStatus) = { --- End diff -- should we just remove this function now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14731#discussion_r75583446 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -241,16 +233,21 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). * Hence they can get selected as new files again. To prevent this, files whose mod time is more * than current batch time are not considered. + * @param fs file status + * @param currentTime time of the batch + * @param modTimeIgnoreThreshold the ignore threshold + * @return true if the file has been modified within the batch window */ - private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { + private def isNewFile(fs: FileStatus, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { --- End diff -- also fs is pretty confusing, because in this context it is often used to refer to as FileSystem. We should pick a different word. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14731#discussion_r75583436 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -241,16 +233,21 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). * Hence they can get selected as new files again. To prevent this, files whose mod time is more * than current batch time are not considered. + * @param fs file status + * @param currentTime time of the batch + * @param modTimeIgnoreThreshold the ignore threshold + * @return true if the file has been modified within the batch window */ - private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { + private def isNewFile(fs: FileStatus, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { --- End diff -- indent is wrong here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14728: [SPARK-17165][SQL] FileStreamSource should not track the...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14728 cc @rxin @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14728 [SPARK-17165][SQL] FileStreamSource should not track the list of seen files indefinitely ## What changes were proposed in this pull request? Before this change, FileStreamSource uses an in-memory hash set to track the list of files processed by the engine. The list can grow indefinitely, leading to OOM or overflow of the hash set. This patch introduces a new user-defined option called "maxFileAge", default to 24 hours. If a file is older than this age, FileStreamSource will purge it from the in-memory map that was used to track the list of files that have been processed. ## How was this patch tested? Added unit tests for the underlying utility, and also added an end-to-end test to validate the purge in FileStreamSourceSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17165 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14728.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14728 commit ce1dd9c4d6a880751e7b3692f9db8597a88f2d05 Author: petermaxlee Date: 2016-08-20T06:35:57Z [SPARK-17165][SQL] FileStreamSource should not track the list of seen files indefinitely --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14728: [SPARK-17165][SQL] FileStreamSource should not tr...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14728#discussion_r75573771 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.Try + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap +import org.apache.spark.util.Utils + +/** + * User specified options for file streams. + */ +class FileStreamOptions(@transient private val parameters: Map[String, String]) --- End diff -- This is a similar setup to CSVOptions and JSONOptions. I felt it would be easier to track the list of options read by the source here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14697: [SPARK-17124][SQL] RelationalGroupedDataset.agg should p...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14697 I updated the description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14697: [SPARK-17124][SQL] RelationalGroupedDataset.agg should p...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14697 For example, run both count and sum for a column. Let me update the description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14710: [SPARK-16533][CORE]
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14710 cc @vanzin and @kayousterhout --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14709: [SPARK-17150][SQL] Support SQL generation for inline tab...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14709 cc @cloud-fan and @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14709: [SPARK-17150][SQL] Support SQL generation for inl...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14709 [SPARK-17150][SQL] Support SQL generation for inline tables ## What changes were proposed in this pull request? This patch adds support for SQL generation for inline tables. With this, it would be possible to create a view that depends on inline tables. ## How was this patch tested? Added a test case in LogicalPlanToSQLSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17150 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14709.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14709 commit a32cbc2eb418cd320978c4c8bce9e8240e1b165f Author: petermaxlee Date: 2016-08-19T04:10:17Z [SPARK-17150][SQL] Support SQL generation for inline tables --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14697: [SPARK-17124][SQL] RelationalGroupedDataset.agg should p...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14697 cc @cloud-fan too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14708: [SPARK-17149][SQL] array.sql for testing array re...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14708#discussion_r75423714 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -161,7 +161,6 @@ object FunctionRegistry { val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( // misc non-aggregate functions expression[Abs]("abs"), -expression[CreateArray]("array"), --- End diff -- these were not in the right place so I moved them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14708: [SPARK-17149][SQL] array.sql for testing array re...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14708 [SPARK-17149][SQL] array.sql for testing array related functions ## What changes were proposed in this pull request? This patch creates array.sql in SQLQueryTestSuite for testing array related functions, including: - indexing - array creation - size - array_contains - sort_array ## How was this patch tested? The patch itself is about adding tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17149 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14708.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14708 commit 1e89cc3c35a22b7d42fe8f9ed23f16b66e92fa20 Author: petermaxlee Date: 2016-08-19T03:33:56Z [SPARK-17149][SQL] array.sql for testing array related functions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14676: [SPARK-16947][SQL] Support type coercion and foldable ex...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14676 Updated. There was a problem with merging master into the branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14676: [SPARK-16947][SQL] Support type coercion and foldable ex...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14676 Thanks - I've reverted back to support only foldable expressions, and added nullability inference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14697: [SPARK-17124][SQL] RelationalGroupedDataset.agg should p...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14697 @hvanhovell have time to take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14697: [SPARK-17124][SQL] RelationalGroupedDataset.agg s...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14697 [SPARK-17124][SQL] RelationalGroupedDataset.agg should be order preserving and allow multiple expressions per column ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? Added a test case in DataFrameAggregateSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17124 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14697.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14697 commit bd64ade6e3a82e9da55163e96303509275c56678 Author: petermaxlee Date: 2016-08-18T06:50:24Z [SPARK-17124][SQL] RelationalGroupedDataset.agg should be order preserving and allow duplicate column names --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14696: [SPARK-16714][SQL] Refactor type widening for consistenc...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14696 @cloud-fan this actually broke decimal precision. I'm starting to think that it would be better to push type coercion into each expression, and then the arithmetic can create special cases for decimal types before calling the functions provided here. It would be a much larger change though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14672: [SPARK-17034][SQL] Minor code cleanup for Unresol...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14672#discussion_r75252322 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala --- @@ -27,22 +27,21 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin /** * Replaces ordinal in 'order by' or 'group by' with UnresolvedOrdinal expression. */ -class UnresolvedOrdinalSubstitution(conf: CatalystConf) extends Rule[LogicalPlan] { - private def isIntegerLiteral(sorter: Expression) = IntegerIndex.unapply(sorter).nonEmpty +class SubstituteUnresolvedOrdinals(conf: CatalystConf) extends Rule[LogicalPlan] { + private def isIntLiteral(sorter: Expression) = IntegerIndex.unapply(sorter).nonEmpty --- End diff -- Good idea. I remove IntegerIndex. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75251858 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || e.isInstanceOf[Unevaluable]) { --- End diff -- I added some comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75249521 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Literal, Rand} +import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.types.LongType + +/** + * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in + * end-to-end tests (in sql/core module) for verifying the correct error messages are shown + * in negative cases. + */ +class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { + + private def lit(v: Any): Literal = Literal(v) + + test("validate inputs are foldable") { +ResolveInlineTables.validateInputEvaluable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1) + +// nondeterministic (rand) should be fine +ResolveInlineTables.validateInputEvaluable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Rand(1) + +// aggregate should not work +intercept[AnalysisException] { + ResolveInlineTables.validateInputEvaluable( +UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Count(lit(1)) +} + +// unresolved attribute should not work +intercept[AnalysisException] { + ResolveInlineTables.validateInputEvaluable( +UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A") --- End diff -- But how would a user construct an AttributeReference? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75248833 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || e.isInstanceOf[Unevaluable]) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") +} + } +} + } + + /** + * Validates the input data dimension: + * 1. All rows have the same cardinality. + * 2. The number of column aliases defined is consistent with the number of columns in data. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputDimension(table: UnresolvedInlineTable): Unit = { +if (table.rows.nonEmpty) { + val numCols = table.rows.head.size + table.rows.zipWithIndex.foreach { case (row, ri) => +if (row.size != numCols) { + table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") +} + } + + if (table.names.size != numCols) { +table.failAnalysis(s"expected ${table.names.size} columns but found $numCols in first row") + } +} + } + + /** + * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] + * into a [[LocalRelation]]. + * + * This function attempts to coerce inputs into consistent types. + * + * This is package visible for unit testing. + */ + private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { +val numCols = table.rows.head.size + +// For each column, traverse all the values and find a common data type. +val targetTypes = table.rows.transpose.zip(table.names).map { case (column, name) => + val inputTypes = column.map(_.dataType) + TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { --- End diff -- Postgres doesn't allow it. We can choose to be consistent with union though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14389: [SPARK-16714][SQL] Refactor type widening for con...
Github user petermaxlee closed the pull request at: https://github.com/apache/spark/pull/14389 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14389: [SPARK-16714][SQL] Refactor type widening for consistenc...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14389 Sorry that it has taken this long. I have submitted a work in progress pull request at https://github.com/apache/spark/pull/14696 Going to close this one and continue the work there, since it is a fairly different pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75248513 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || e.isInstanceOf[Unevaluable]) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") +} + } +} + } + + /** + * Validates the input data dimension: + * 1. All rows have the same cardinality. + * 2. The number of column aliases defined is consistent with the number of columns in data. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputDimension(table: UnresolvedInlineTable): Unit = { +if (table.rows.nonEmpty) { + val numCols = table.rows.head.size + table.rows.zipWithIndex.foreach { case (row, ri) => --- End diff -- That's a good idea. Let me do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75248427 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || e.isInstanceOf[Unevaluable]) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") +} + } +} + } + + /** + * Validates the input data dimension: + * 1. All rows have the same cardinality. + * 2. The number of column aliases defined is consistent with the number of columns in data. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputDimension(table: UnresolvedInlineTable): Unit = { +if (table.rows.nonEmpty) { + val numCols = table.rows.head.size + table.rows.zipWithIndex.foreach { case (row, ri) => +if (row.size != numCols) { + table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") +} + } + + if (table.names.size != numCols) { +table.failAnalysis(s"expected ${table.names.size} columns but found $numCols in first row") + } +} + } + + /** + * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] + * into a [[LocalRelation]]. + * + * This function attempts to coerce inputs into consistent types. + * + * This is package visible for unit testing. + */ + private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { +val numCols = table.rows.head.size + +// For each column, traverse all the values and find a common data type. +val targetTypes = table.rows.transpose.zip(table.names).map { case (column, name) => + val inputTypes = column.map(_.dataType) + TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { +table.failAnalysis(s"incompatible types found in column $name for inline table") + } +} +assert(targetTypes.size == table.names.size) --- End diff -- asserts are not meant to be user facing. They are meant to be defensive against programming errors (i.e. bugs in Spark). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscri
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75248346 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { --- End diff -- This was suggested by @hvanhovell. I think private functions are still meant to be private. This is only package visible for the purpose of testing. That is to say, I don't expect developers to be calling this function either. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75248351 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputEvaluable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || e.isInstanceOf[Unevaluable]) { --- End diff -- If we check foldable rand() wouldn't work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14696: [SPARK-16714][SQL] Refactor type widening for con...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14696 [SPARK-16714][SQL] Refactor type widening for consistency - WIP ## What changes were proposed in this pull request? WIP ## How was this patch tested? WIP You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-16714-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14696.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14696 commit 9df455107cf89b590ca0bcac807ea8671ccab344 Author: petermaxlee Date: 2016-08-18T04:44:21Z [SPARK-16714][SQL] Refactor type widening for consistency --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14695: [SPARK-17117][SQL] 1 / NULL should not fail analysis
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14695 @JoshRosen can you take a look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14695: [SPARK-17117][SQL] 1 / NULL should not fail analy...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14695 [SPARK-17117][SQL] 1 / NULL should not fail analysis ## What changes were proposed in this pull request? This patch fixes the problem described in SPARK-17117, i.e. "SELECT 1 / NULL" throws an analysis exception: ``` org.apache.spark.sql.AnalysisException: cannot resolve '(1 / NULL)' due to data type mismatch: differing types in '(1 / NULL)' (int and null). ``` The problem is that division type coercion did not take null type into account. ## How was this patch tested? A unit test for the type coercion, and a few end-to-end test cases using SQLQueryTestSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17117 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14695.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14695 commit 34fddc7a10aa395a01687e040ef523df0bad1610 Author: petermaxlee Date: 2016-08-18T03:47:00Z [SPARK-17117][SQL] 1 / NULL should not fail analysis commit a946269811540d6cdb2237c62f095f847b461cee Author: petermaxlee Date: 2016-08-18T03:54:15Z Add end-to-end tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75244731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputFoldable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is publicly visible for unit testing. + */ + def validateInputFoldable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || !e.foldable) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") +} + } +} + } + + /** + * Validates the input data dimension: + * 1. All rows have the same cardinality. + * 2. The number of column aliases defined is consistent with the number of columns in data. + * + * This is publicly visible for unit testing. + */ + def validateInputDimension(table: UnresolvedInlineTable): Unit = { +if (table.rows.nonEmpty) { + val numCols = table.rows.head.size + table.rows.zipWithIndex.foreach { case (row, ri) => +if (row.size != numCols) { + table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") +} + } + + if (table.names.size != numCols) { +table.failAnalysis(s"expected ${table.names.size} columns but found $numCols in first row") + } +} + } + + /** + * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] + * into a [[LocalRelation]]. + * + * This function attempts to coerce inputs into consistent types. + * + * This is publicly visible for unit testing. + */ + def convert(table: UnresolvedInlineTable): LocalRelation = { +val numCols = table.rows.head.size + +// For each column, traverse all the values and find a common data type. +val targetTypes = Seq.tabulate(numCols) { ci => --- End diff -- I didn't check for nullability, since I don't think it actually matters much for the purpose we are using this feature. What would be useful is to be able to define the data type explicitly, and then we can do controlled tests for nullability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14676: [SPARK-16947][SQL] Support type coercion and foldable ex...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14676 Thanks for review. I have updated the pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75228342 --- Diff: sql/core/src/test/resources/sql-tests/results/inline-table.sql.out --- @@ -0,0 +1,135 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 15 + + +-- !query 0 +select * from values ("one", 1) +-- !query 0 schema +struct +-- !query 0 output +one1 + + +-- !query 1 +select * from values ("one", 1) as data +-- !query 1 schema +struct +-- !query 1 output +one1 + + +-- !query 2 +select * from values ("one", 1) as data(a, b) +-- !query 2 schema +struct +-- !query 2 output +one1 + + +-- !query 3 +select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b) +-- !query 3 schema +struct +-- !query 3 output +one1 +three NULL +two2 + + +-- !query 4 +select * from values ("one", null), ("two", null) as data(a, b) +-- !query 4 schema +struct +-- !query 4 output +oneNULL +twoNULL + + +-- !query 5 +select * from values ("one", 1), ("two", 2L) as data(a, b) +-- !query 5 schema +struct +-- !query 5 output +one1 +two2 + + +-- !query 6 +select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) +-- !query 6 schema +struct +-- !query 6 output +one1 +two4 + + +-- !query 7 +select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) +-- !query 7 schema +struct> +-- !query 7 output +one[0,1] +two[2,3] + + +-- !query 8 +select * from values ("one", 2.0), ("two", 3.0D) as data(a, b) +-- !query 8 schema +struct +-- !query 8 output +one2.0 +two3.0 + + +-- !query 9 +select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) +-- !query 9 schema +struct +-- !query 9 output +one0.087440518337355 +two3.0 + + +-- !query 10 +select * from values ("one", 2.0), ("two") as data(a, b) +-- !query 10 schema +struct<> +-- !query 10 output +org.apache.spark.sql.AnalysisException +expected 2 columns but found 1 columns in row 1; line 1 pos 14 + + +-- !query 11 +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +-- !query 11 schema +struct<> +-- !query 11 output +org.apache.spark.sql.AnalysisException +incompatible types found in column b for inline table; line 1 pos 14 + + +-- !query 12 +select * from values ("one"), ("two") as data(a, b) +-- !query 12 schema +struct<> +-- !query 12 output +org.apache.spark.sql.AnalysisException +expected 2 columns but found 1 in first row; line 1 pos 14 + + +-- !query 13 +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +-- !query 13 schema +struct<> +-- !query 13 output +org.apache.spark.sql.AnalysisException +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 + + +-- !query 14 +select * from values ("one", count(1)), ("two", 2) as data(a, b) +-- !query 14 schema +struct<> +-- !query 14 output +java.lang.UnsupportedOperationException +Cannot evaluate expression: count(1) --- End diff -- this is not a great exception because it is thrown by the projection - not during analysis. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75228185 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + // validateInputFoldable(table) --- End diff -- @hvanhovell what's a good check to do here? I don't want to rule out rand but do want to rule out count(1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75227645 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { +case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputFoldable(table) + convert(table) + } + + /** + * Validates that all inline table data are foldable expressions. + * + * This is publicly visible for unit testing. + */ + def validateInputFoldable(table: UnresolvedInlineTable): Unit = { +table.rows.foreach { row => + row.foreach { e => +if (!e.resolved || !e.foldable) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") +} + } +} + } + + /** + * Validates the input data dimension: + * 1. All rows have the same cardinality. + * 2. The number of column aliases defined is consistent with the number of columns in data. + * + * This is publicly visible for unit testing. + */ + def validateInputDimension(table: UnresolvedInlineTable): Unit = { +if (table.rows.nonEmpty) { + val numCols = table.rows.head.size + table.rows.zipWithIndex.foreach { case (row, ri) => +if (row.size != numCols) { + table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") +} + } + + if (table.names.size != numCols) { +table.failAnalysis(s"expected ${table.names.size} columns but found $numCols in first row") + } +} + } + + /** + * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] + * into a [[LocalRelation]]. + * + * This function attempts to coerce inputs into consistent types. + * + * This is publicly visible for unit testing. + */ + def convert(table: UnresolvedInlineTable): LocalRelation = { +val numCols = table.rows.head.size + +// For each column, traverse all the values and find a common data type. +val targetTypes = Seq.tabulate(numCols) { ci => + val inputTypes = table.rows.map(_(ci).dataType) + TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { +table.failAnalysis(s"incompatible types found in column $ci for inline table") + } +} +assert(targetTypes.size == table.names.size) + +val newRows: Seq[InternalRow] = table.rows.map { row => + InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => +val targetType = targetTypes(ci) +if (e.dataType.sameType(targetType)) { + e.eval() --- End diff -- How do we determine if something is valid here? If we don't do a foldable check and want to support nondeterministic functions, how do we rule out something like count(1)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not wor
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75173027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -50,6 +50,23 @@ case class UnresolvedRelation( } /** + * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into + * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. + * + * @param names list of column names + * @param rows expressions for the data + */ +case class UnresolvedInlineTable( +names: Seq[String], +rows: Seq[Seq[Expression]]) + extends LeafNode { + + lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved)) --- End diff -- I do want this memoized, so a lazy val is better here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14672: [SPARK-17034][SQL] Minor code cleanup for UnresolvedOrdi...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14672 cc @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14676#discussion_r75065140 --- Diff: sql/core/src/test/resources/sql-tests/inputs/inline-table.sql --- @@ -0,0 +1,39 @@ + +-- single row, without table and column alias +select * from values ("one", 1); + +-- single row, without column alias +select * from values ("one", 1) as data; + +-- single row +select * from values ("one", 1) as data(a, b); --- End diff -- added --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14676: [SPARK-16947][SQL] Support type coercion and foldable ex...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14676 cc @hvanhovell and @cloud-fan. I followed @cloud-fan's suggestion to use the analyzer to replace inline tables with LocalRelation. One thing that is broken about inline tables is SQL generation. We might need to implement SQL generation for LocalRelation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14676: [SPARK-16947][SQL] Support type coercion and fold...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14676 [SPARK-16947][SQL] Support type coercion and foldable expression for inline tables ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? Added a new unit test suite ResolveInlineTablesSuite and a new file-based end-to-end test inline-table.sql. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-16947 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14676 commit d7acae55034d4ff5da3e7579cf44acb7b704b4a1 Author: petermaxlee Date: 2016-08-17T00:40:07Z [SPARK-16947][SQL] Support type coercion and foldable expression for inline tables --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14672: [SPARK-17034][SQL] Minor code cleanup for Unresol...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14672 [SPARK-17034][SQL] Minor code cleanup for UnresolvedOrdinal ## What changes were proposed in this pull request? I was looking at the code for UnresolvedOrdinal and made a few small changes to make it slightly more clear: 1. Rename the rule to SubstituteUnresolvedOrdinals which is more consistent with other rules that start with verbs. Note that this is still inconsistent with CTESubstitution and WindowsSubstitution. 2. Broke the test suite down from a single test case to three test cases. ## How was this patch tested? This is a minor cleanup. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17034 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14672.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14672 commit 0f4434865e42c3f9175930f590dcb5e667b517b2 Author: petermaxlee Date: 2016-08-16T20:50:37Z [SPARK-17034][SQL] Minor code cleanup for UnresolvedOrdinal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14539: [SPARK-16947][SQL] Improve type coercion for inli...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14539#discussion_r75012302 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -656,40 +656,36 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create an inline table (a virtual table in Hive parlance). */ override def visitInlineTable(ctx: InlineTableContext): LogicalPlan = withOrigin(ctx) { -// Get the backing expressions. -val expressions = ctx.expression.asScala.map { eCtx => - val e = expression(eCtx) - assert(e.foldable, "All expressions in an inline table must be constants.", eCtx) - e -} - -// Validate and evaluate the rows. -val (structType, structConstructor) = expressions.head.dataType match { - case st: StructType => -(st, (e: Expression) => e) - case dt => -val st = CreateStruct(Seq(expressions.head)).dataType -(st, (e: Expression) => CreateStruct(Seq(e))) -} -val rows = expressions.map { - case expression => -val safe = Cast(structConstructor(expression), structType) -safe.eval().asInstanceOf[InternalRow] +// Create expressions. +val rows = ctx.expression.asScala.map { e => + expression(e) match { +case CreateStruct(children) => children --- End diff -- Actually I think I understand what's happening here now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14539: [SPARK-16947][SQL] Improve type coercion for inli...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14539#discussion_r75012141 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -656,40 +656,36 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create an inline table (a virtual table in Hive parlance). */ override def visitInlineTable(ctx: InlineTableContext): LogicalPlan = withOrigin(ctx) { -// Get the backing expressions. -val expressions = ctx.expression.asScala.map { eCtx => - val e = expression(eCtx) - assert(e.foldable, "All expressions in an inline table must be constants.", eCtx) - e -} - -// Validate and evaluate the rows. -val (structType, structConstructor) = expressions.head.dataType match { - case st: StructType => -(st, (e: Expression) => e) - case dt => -val st = CreateStruct(Seq(expressions.head)).dataType -(st, (e: Expression) => CreateStruct(Seq(e))) -} -val rows = expressions.map { - case expression => -val safe = Cast(structConstructor(expression), structType) -safe.eval().asInstanceOf[InternalRow] +// Create expressions. +val rows = ctx.expression.asScala.map { e => + expression(e) match { +case CreateStruct(children) => children --- End diff -- @hvanhovell what's this about? Why do we need to expand struct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14539: [SPARK-16947][SQL] Improve type coercion for inline tabl...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14539 @hvanhovell do you mind me taking a look at this? I am running into an issue in which I cannot use array() function to construct an array in inline tables (only literals are allowed). I can try fix the type coercion issue there too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14608: [SPARK-17013][SQL] Parse negative numeric literal...
GitHub user petermaxlee opened a pull request: https://github.com/apache/spark/pull/14608 [SPARK-17013][SQL] Parse negative numeric literals ## What changes were proposed in this pull request? This patch updates the SQL parser to parse negative numeric literals as numeric literals, instead of unary minus of positive literals. This allows the parser to parse the minimal value for each data type, e.g. "-32768S". ## How was this patch tested? Updated test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/petermaxlee/spark SPARK-17013 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14608.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14608 commit 1ae7a3b9f63a9787304b74bc10914b436574eb78 Author: petermaxlee Date: 2016-08-11T21:02:10Z [SPARK-17013][SQL] Parse negative numeric literals --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14599: [SPARK-17013][SQL] handle corner case for negative integ...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14599 Thanks. How about this one? https://github.com/apache/spark/pull/14608 It takes the approach here but applies to all types for consistency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14598: [SPARK-17018][SQL] literals.sql for testing literal pars...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14598 I also didn't include \b and \0 parsing. Otherwise github shows the result file as binary and refuse to display the diff, which makes it more difficult to review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14598: [SPARK-17018][SQL] literals.sql for testing literal pars...
Github user petermaxlee commented on the issue: https://github.com/apache/spark/pull/14598 I have updated this to include more string literals and added timestmap/date/interval parsing. That said, I didn't add all the test cases for interval because there were a large number, and I felt those are best left for parser unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14598: [SPARK-17018][SQL] literals.sql for testing liter...
Github user petermaxlee commented on a diff in the pull request: https://github.com/apache/spark/pull/14598#discussion_r74472437 --- Diff: sql/core/src/test/resources/sql-tests/results/literals.sql.out --- @@ -0,0 +1,148 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 16 + + +-- !query 0 +select null +-- !query 0 schema +struct +-- !query 0 output +NULL + + +-- !query 1 +select true, false +-- !query 1 schema +struct +-- !query 1 output +true false + + +-- !query 2 +select 1Y, 127Y, -127Y +-- !query 2 schema +struct<1:tinyint,127:tinyint,(-127):tinyint> +-- !query 2 output +1 127 -127 + + +-- !query 3 +select 128Y +-- !query 3 schema +struct<> +-- !query 3 output +org.apache.spark.sql.catalyst.parser.ParseException + +Value out of range. Value:"128" Radix:10(line 1, pos 7) + +== SQL == +select 128Y +---^^^ + + +-- !query 4 +select 1S, 32767S, -32767S +-- !query 4 schema +struct<1:smallint,32767:smallint,(-32767):smallint> +-- !query 4 output +1 32767 -32767 + + +-- !query 5 +select 32768S +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.catalyst.parser.ParseException + +Value out of range. Value:"32768" Radix:10(line 1, pos 7) + +== SQL == +select 32768S +---^^^ + + +-- !query 6 +select 1L, 2147483648L, 9223372036854775807L +-- !query 6 schema +struct<1:bigint,2147483648:bigint,9223372036854775807:bigint> +-- !query 6 output +1 2147483648 9223372036854775807 + + +-- !query 7 +select 9223372036854775808L +-- !query 7 schema +struct<> +-- !query 7 output +org.apache.spark.sql.catalyst.parser.ParseException --- End diff -- Sure. Will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org