[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16133


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90794591
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 )
   }
 
-  test("input_file_name - FileScanRDD") {
+  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
 withTempPath { dir =>
   val data = sparkContext.parallelize(0 to 10).toDF("id")
   data.write.parquet(dir.getCanonicalPath)
-  val answer = 
spark.read.parquet(dir.getCanonicalPath).select(input_file_name())
-.head.getString(0)
-  assert(answer.contains(dir.getCanonicalPath))
 
-  checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+  // Test the 3 expressions when reading from files
+  val q = spark.read.parquet(dir.getCanonicalPath).select(
+input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()"))
--- End diff --

I see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90783813
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 )
   }
 
-  test("input_file_name - FileScanRDD") {
+  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
 withTempPath { dir =>
   val data = sparkContext.parallelize(0 to 10).toDF("id")
   data.write.parquet(dir.getCanonicalPath)
-  val answer = 
spark.read.parquet(dir.getCanonicalPath).select(input_file_name())
-.head.getString(0)
-  assert(answer.contains(dir.getCanonicalPath))
 
-  checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+  // Test the 3 expressions when reading from files
+  val q = spark.read.parquet(dir.getCanonicalPath).select(
+input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()"))
--- End diff --

I'm actually intentionally not adding those because I don't know how common 
these expressions will be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90774453
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---
@@ -132,54 +132,57 @@ class NewHadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext): 
InterruptibleIterator[(K, V)] = {
 val iter = new Iterator[(K, V)] {
-  val split = theSplit.asInstanceOf[NewHadoopPartition]
+  private val split = theSplit.asInstanceOf[NewHadoopPartition]
   logInfo("Input split: " + split.serializableHadoopSplit)
-  val conf = getConf
+  private val conf = getConf
 
-  val inputMetrics = context.taskMetrics().inputMetrics
-  val existingBytesRead = inputMetrics.bytesRead
+  private val inputMetrics = context.taskMetrics().inputMetrics
+  private val existingBytesRead = inputMetrics.bytesRead
 
   // Sets the thread local variable for the file's name
--- End diff --

I agree with it and also a comment in `HadoopRDD`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90774307
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 )
   }
 
-  test("input_file_name - FileScanRDD") {
+  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
 withTempPath { dir =>
   val data = sparkContext.parallelize(0 to 10).toDF("id")
   data.write.parquet(dir.getCanonicalPath)
-  val answer = 
spark.read.parquet(dir.getCanonicalPath).select(input_file_name())
-.head.getString(0)
-  assert(answer.contains(dir.getCanonicalPath))
 
-  checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+  // Test the 3 expressions when reading from files
+  val q = spark.read.parquet(dir.getCanonicalPath).select(
+input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()"))
--- End diff --

We should add `input_file_block_start()`, `input_file_block_length()` to 
`functions.scala` and use them the same as `input_file_name()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90774314
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -567,10 +590,22 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 classOf[LongWritable],
 classOf[Text])
   val df = rdd.map(pair => pair._2.toString).toDF()
-  val answer = df.select(input_file_name()).head.getString(0)
-  assert(answer.contains(dir.getCanonicalPath))
 
-  checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+  // Test the 3 expressions when reading from files
+  val q = df.select(
+input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()"))
--- End diff --

ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90774313
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -533,31 +533,54 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 )
   }
 
-  test("input_file_name - FileScanRDD") {
+  test("input_file_name, input_file_block_start, input_file_block_length - 
FileScanRDD") {
 withTempPath { dir =>
   val data = sparkContext.parallelize(0 to 10).toDF("id")
   data.write.parquet(dir.getCanonicalPath)
-  val answer = 
spark.read.parquet(dir.getCanonicalPath).select(input_file_name())
-.head.getString(0)
-  assert(answer.contains(dir.getCanonicalPath))
 
-  checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+  // Test the 3 expressions when reading from files
+  val q = spark.read.parquet(dir.getCanonicalPath).select(
+input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()"))
+  val firstRow = q.head()
+  assert(firstRow.getString(0).contains(dir.getCanonicalPath))
+  assert(firstRow.getLong(1) == 0)
+  assert(firstRow.getLong(2) > 0)
+
+  // Now read directly from the original RDD without going through any 
files to make sure
+  // we are returning empty string, -1, and -1.
+  checkAnswer(
+data.select(
+  input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()")
+).limit(1),
+Row("", -1L, -1L))
 }
   }
 
-  test("input_file_name - HadoopRDD") {
+  test("input_file_name, input_file_block_start, input_file_block_length - 
HadoopRDD") {
 withTempPath { dir =>
   val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF()
   data.write.text(dir.getCanonicalPath)
   val df = spark.sparkContext.textFile(dir.getCanonicalPath).toDF()
-  val answer = df.select(input_file_name()).head.getString(0)
-  assert(answer.contains(dir.getCanonicalPath))
 
-  checkAnswer(data.select(input_file_name()).limit(1), Row(""))
+  // Test the 3 expressions when reading from files
+  val q = df.select(
+input_file_name(), expr("input_file_block_start()"), 
expr("input_file_block_length()"))
--- End diff --

ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-03 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16133#discussion_r90771895
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---
@@ -132,54 +132,57 @@ class NewHadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext): 
InterruptibleIterator[(K, V)] = {
 val iter = new Iterator[(K, V)] {
-  val split = theSplit.asInstanceOf[NewHadoopPartition]
+  private val split = theSplit.asInstanceOf[NewHadoopPartition]
   logInfo("Input split: " + split.serializableHadoopSplit)
-  val conf = getConf
+  private val conf = getConf
 
-  val inputMetrics = context.taskMetrics().inputMetrics
-  val existingBytesRead = inputMetrics.bytesRead
+  private val inputMetrics = context.taskMetrics().inputMetrics
+  private val existingBytesRead = inputMetrics.bytesRead
 
   // Sets the thread local variable for the file's name
--- End diff --

nit. Updating comment together?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16133: [SPARK-18702][SQL] input_file_block_start and inp...

2016-12-03 Thread rxin
GitHub user rxin opened a pull request:

https://github.com/apache/spark/pull/16133

[SPARK-18702][SQL] input_file_block_start and input_file_block_length

## What changes were proposed in this pull request?
We currently have function input_file_name to get the path of the input 
file, but don't have functions to get the block start offset and length. This 
patch introduces two functions:

1. input_file_block_start: returns the file block start offset, or -1 if 
not available.

2. input_file_block_length: returns the file block length, or -1 if not 
available.

## How was this patch tested?
Updated existing test cases in ColumnExpressionSuite that covered 
input_file_name to also cover the two new functions.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rxin/spark SPARK-18702

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16133


commit 7713ebe2f2f2703544b964157f2ae60da48a22a0
Author: Reynold Xin 
Date:   2016-12-04T05:40:21Z

[SPARK-18702][SQL] input_file_block_start and input_file_block_length 
function




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org