sadikovi commented on code in PR #37228:
URL: https://github.com/apache/spark/pull/37228#discussion_r929325683


##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java:
##########
@@ -75,6 +75,11 @@ final class ParquetColumnVector {
     this.isPrimitive = column.isPrimitive();
 
     if (missingColumns.contains(column)) {
+      if (ParquetRowIndexUtil.isRowIndexColumn(column)) {

Review Comment:
   How expensive is this call?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -182,18 +183,32 @@ object FileFormat {
 
   val FILE_MODIFICATION_TIME = "file_modification_time"
 
+  val ROW_INDEX = "row_index"
+
+  // A name for a temporary column that holds row indexes computed by the file 
format reader
+  // until they can be placed in the _metadata struct.
+  val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
   val METADATA_NAME = "_metadata"
 
-  // supported metadata struct fields for hadoop fs relation
-  val METADATA_STRUCT: StructType = new StructType()
-    .add(StructField(FILE_PATH, StringType))
-    .add(StructField(FILE_NAME, StringType))
-    .add(StructField(FILE_SIZE, LongType))
-    .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+  /** Schema of metadata struct that can be produced by every file format. */
+  val BASE_METADATA_STRUCT: StructType = new StructType()
+    .add(StructField(FileFormat.FILE_PATH, StringType))
+    .add(StructField(FileFormat.FILE_NAME, StringType))
+    .add(StructField(FileFormat.FILE_SIZE, LongType))
+    .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType))
 
-  // create a file metadata struct col
-  def createFileMetadataCol: AttributeReference =
-    FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT)
+  /**
+   * Create a file metadata struct column containing fields supported by the 
given file format.
+   */
+  def createFileMetadataCol(fileFormat: FileFormat = null): AttributeReference 
= {

Review Comment:
   What happens if the row index column ends up being a partition column? How 
is it handled then? Is there a safe guard to make sure users cannot use that 
for partitioning?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala:
##########
@@ -70,7 +70,7 @@ case class StreamingRelation(dataSource: DataSource, 
sourceName: String, output:
         // filter out the metadata struct column if it has the name 
conflicting with output columns.
         // if the file has a column "_metadata",
         // then the data column should be returned not the metadata struct 
column
-        Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+        Seq(FileFormat.createFileMetadataCol()).filterNot(isOutputColumn)

Review Comment:
   Should it also take fileformat?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+
+object RowIndexUtil {
+  def findColumnIndexInSchema(sparkSchema: StructType): Int = {

Review Comment:
   What is a column index? Or is it "column for row index"?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -220,10 +235,24 @@ object FileFormat {
           // the modificationTime from the file is in millisecond,
           // while internally, the TimestampType `file_modification_time` is 
stored in microsecond
           row.update(i, fileModificationTime * 1000L)
+        case ROW_INDEX =>
+          // Reserve the spot in the row for a LongType value. The metadata 
fields that have
+          // identical values for each row of the file are set by this 
function, while fields that
+          // have different values (such as row index) are set separately.
+          row.update(i, -1L)
       }
     }
     row
   }
+
+  /**
+   * Does the given metadata column always contain identical values for all 
rows originating from

Review Comment:
   nit: could this be rephrased as a statement instead? Thanks. Otherwise, 
people might confuse it with a TODO item.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala:
##########
@@ -50,7 +50,10 @@ case class ParquetScan(
   override def isSplitable(path: Path): Boolean = {
     // If aggregate is pushed down, only the file footer will be read once,
     // so file should not be split across multiple tasks.
-    pushedAggregate.isEmpty
+    pushedAggregate.isEmpty &&
+    // SPARK-39634: Allow file splitting in combination with row index 
generation once

Review Comment:
   nit: looks like the comment is off. I guess it is for L56.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to