[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16133 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90794591 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("input_file_name - FileScanRDD") { + test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = spark.read.parquet(dir.getCanonicalPath).select(input_file_name()) -.head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(input_file_name()).limit(1), Row("")) + // Test the 3 expressions when reading from files + val q = spark.read.parquet(dir.getCanonicalPath).select( +input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()")) --- End diff -- I see. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90783813 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("input_file_name - FileScanRDD") { + test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = spark.read.parquet(dir.getCanonicalPath).select(input_file_name()) -.head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(input_file_name()).limit(1), Row("")) + // Test the 3 expressions when reading from files + val q = spark.read.parquet(dir.getCanonicalPath).select( +input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()")) --- End diff -- I'm actually intentionally not adding those because I don't know how common these expressions will be. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90774453 --- Diff: core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala --- @@ -132,54 +132,57 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[NewHadoopPartition] + private val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = getConf + private val conf = getConf - val inputMetrics = context.taskMetrics().inputMetrics - val existingBytesRead = inputMetrics.bytesRead + private val inputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name --- End diff -- I agree with it and also a comment in `HadoopRDD`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90774307 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("input_file_name - FileScanRDD") { + test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = spark.read.parquet(dir.getCanonicalPath).select(input_file_name()) -.head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(input_file_name()).limit(1), Row("")) + // Test the 3 expressions when reading from files + val q = spark.read.parquet(dir.getCanonicalPath).select( +input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()")) --- End diff -- We should add `input_file_block_start()`, `input_file_block_length()` to `functions.scala` and use them the same as `input_file_name()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90774314 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -567,10 +590,22 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { classOf[LongWritable], classOf[Text]) val df = rdd.map(pair => pair._2.toString).toDF() - val answer = df.select(input_file_name()).head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(input_file_name()).limit(1), Row("")) + // Test the 3 expressions when reading from files + val q = df.select( +input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()")) --- End diff -- ditto. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90774313 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { ) } - test("input_file_name - FileScanRDD") { + test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { withTempPath { dir => val data = sparkContext.parallelize(0 to 10).toDF("id") data.write.parquet(dir.getCanonicalPath) - val answer = spark.read.parquet(dir.getCanonicalPath).select(input_file_name()) -.head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(input_file_name()).limit(1), Row("")) + // Test the 3 expressions when reading from files + val q = spark.read.parquet(dir.getCanonicalPath).select( +input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.getCanonicalPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + + // Now read directly from the original RDD without going through any files to make sure + // we are returning empty string, -1, and -1. + checkAnswer( +data.select( + input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()") +).limit(1), +Row("", -1L, -1L)) } } - test("input_file_name - HadoopRDD") { + test("input_file_name, input_file_block_start, input_file_block_length - HadoopRDD") { withTempPath { dir => val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF() data.write.text(dir.getCanonicalPath) val df = spark.sparkContext.textFile(dir.getCanonicalPath).toDF() - val answer = df.select(input_file_name()).head.getString(0) - assert(answer.contains(dir.getCanonicalPath)) - checkAnswer(data.select(input_file_name()).limit(1), Row("")) + // Test the 3 expressions when reading from files + val q = df.select( +input_file_name(), expr("input_file_block_start()"), expr("input_file_block_length()")) --- End diff -- ditto. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/16133#discussion_r90771895 --- Diff: core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala --- @@ -132,54 +132,57 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[NewHadoopPartition] + private val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = getConf + private val conf = getConf - val inputMetrics = context.taskMetrics().inputMetrics - val existingBytesRead = inputMetrics.bytesRead + private val inputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name --- End diff -- nit. Updating comment together? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...
GitHub user rxin opened a pull request: https://github.com/apache/spark/pull/16133 [SPARK-18702][SQL] input_file_block_start and input_file_block_length ## What changes were proposed in this pull request? We currently have function input_file_name to get the path of the input file, but don't have functions to get the block start offset and length. This patch introduces two functions: 1. input_file_block_start: returns the file block start offset, or -1 if not available. 2. input_file_block_length: returns the file block length, or -1 if not available. ## How was this patch tested? Updated existing test cases in ColumnExpressionSuite that covered input_file_name to also cover the two new functions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rxin/spark SPARK-18702 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16133 commit 7713ebe2f2f2703544b964157f2ae60da48a22a0 Author: Reynold XinDate: 2016-12-04T05:40:21Z [SPARK-18702][SQL] input_file_block_start and input_file_block_length function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org