jaceklaskowski commented on code in PR #40334:
URL: https://github.com/apache/spark/pull/40334#discussion_r1144552053


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -945,98 +987,102 @@ object ColumnarReaderFactory extends 
PartitionReaderFactory {
   }
 }
 
-class PartitionAwareDataSource extends TestingV2Source {
+class MyScanBuilder(partitions: Array[InputPartition],
+                    partitionKeys: Option[Seq[String]],
+                    orderKeys: Option[Seq[String]] = None) extends 
SimpleScanBuilder
+  with SupportsReportPartitioning with SupportsReportOrdering {
 
-  class MyScanBuilder extends SimpleScanBuilder
-    with SupportsReportPartitioning {
+  override def planInputPartitions(): Array[InputPartition] = partitions
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // Note that we don't have same value of column `i` across partitions.
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
-    }
+  override def createReaderFactory(): PartitionReaderFactory = {
+    SpecificReaderFactory
+  }
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
-    }
+  override def outputPartitioning(): Partitioning =
+    partitionKeys.map(keys =>
+      new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 
partitions.length)
+    ).getOrElse(

Review Comment:
   nit: Replace `(` with `{`



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -945,98 +987,102 @@ object ColumnarReaderFactory extends 
PartitionReaderFactory {
   }
 }
 
-class PartitionAwareDataSource extends TestingV2Source {
+class MyScanBuilder(partitions: Array[InputPartition],
+                    partitionKeys: Option[Seq[String]],
+                    orderKeys: Option[Seq[String]] = None) extends 
SimpleScanBuilder
+  with SupportsReportPartitioning with SupportsReportOrdering {
 
-  class MyScanBuilder extends SimpleScanBuilder
-    with SupportsReportPartitioning {
+  override def planInputPartitions(): Array[InputPartition] = partitions
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // Note that we don't have same value of column `i` across partitions.
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
-    }
+  override def createReaderFactory(): PartitionReaderFactory = {
+    SpecificReaderFactory
+  }
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
-    }
+  override def outputPartitioning(): Partitioning =
+    partitionKeys.map(keys =>
+      new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 
partitions.length)
+    ).getOrElse(
+      new UnknownPartitioning(partitions.length)
+    )
 
-    override def outputPartitioning(): Partitioning =
-      new KeyGroupedPartitioning(Array(FieldReference("i")), 2)
-  }
+  override def outputOrdering(): Array[SortOrder] = orderKeys.map(_.map(
+    new MySortOrder(_)
+  )).getOrElse(Seq.empty).toArray
+}
 
-  override def getTable(options: CaseInsensitiveStringMap): Table = new 
SimpleBatchTable {
-    override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
-      new MyScanBuilder()
-    }
-  }
+class MySortOrder(columnName: String) extends SortOrder {
+  override def expression(): Expression = new MyIdentityTransform(
+    new MyNamedReference(columnName)
+  )
+  override def direction(): SortDirection = SortDirection.ASCENDING
+  override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
 }
 
-class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+class MyNamedReference(parts: String*) extends NamedReference {
+  override def fieldNames(): Array[String] = parts.toArray
+}
 
-  class MyScanBuilder(
-      val partitionKeys: Option[Seq[String]],
-      val orderKeys: Seq[String])
-    extends SimpleScanBuilder
-    with SupportsReportPartitioning with SupportsReportOrdering {
+class MyIdentityTransform(namedReference: NamedReference) extends Transform {
+  override def name(): String = "identity"
+  override def references(): Array[NamedReference] = Array.empty
+  override def arguments(): Array[Expression] = Seq(namedReference).toArray
+}
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // data are partitioned by column `i` or `j`, so we can report any 
partitioning
-      // column `i` is not ordered globally, but within partitions, together 
with`j`
-      // this allows us to report ordering by [i] and [i, j]
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2)))
-    }
+class PartitionAwareDataSource extends TestingV2Source {
+  // Note that we don't have same value of column `i` across partitions.
+  val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
+      new SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2))
+    )
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
+  override def getTable(options: CaseInsensitiveStringMap): Table = new 
SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+      new MyScanBuilder(partitions, Some(Seq("i")))
     }
+  }
+}
 
-    override def outputPartitioning(): Partitioning = {
-      partitionKeys.map(keys =>
-        new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 2)
-      ).getOrElse(
-        new UnknownPartitioning(2)
-      )
-    }
+class PartitionAwareDataSourceWithKey extends PartitionAwareDataSource {
+  // Note that we don't have same value of column `i` across partitions.
+  // Note that we have only a single value for column `i` per partition.
+  override val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartitionWithKey(Array(1, 1), Array(4, 4)),
+      new SpecificInputPartitionWithKey(Array(3), Array(6)),
+      new SpecificInputPartitionWithKey(Array(2), Array(6)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2))
+    )
+}
 
-    override def outputOrdering(): Array[SortOrder] = orderKeys.map(
-      new MySortOrder(_)
-    ).toArray
-  }
+class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+  // data are partitioned by column `i` or `j`, so we can report any 
partitioning
+  // column `i` is not ordered globally, but within partitions, together 
with`j`
+  // this allows us to report ordering by [i] and [i, j]
+  override val partitions: Array[InputPartition] =
+  Array(
+    new SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
+    new SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2))
+  )
 
   override def getTable(options: CaseInsensitiveStringMap): Table = new 
SimpleBatchTable {
     override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
       new MyScanBuilder(
+        partitions,
         Option(options.get("partitionKeys")).map(_.split(",")),
-        
Option(options.get("orderKeys")).map(_.split(",").toSeq).getOrElse(Seq.empty)
+        Option(options.get("orderKeys")).map(_.split(",").toSeq)
       )
     }
   }
-
-  class MySortOrder(columnName: String) extends SortOrder {
-    override def expression(): Expression = new MyIdentityTransform(
-      new MyNamedReference(columnName)
-    )
-    override def direction(): SortDirection = SortDirection.ASCENDING
-    override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
-  }
-
-  class MyNamedReference(parts: String*) extends NamedReference {
-    override def fieldNames(): Array[String] = parts.toArray
-  }
-
-  class MyIdentityTransform(namedReference: NamedReference) extends Transform {
-    override def name(): String = "identity"
-    override def references(): Array[NamedReference] = Array.empty
-    override def arguments(): Array[Expression] = Seq(namedReference).toArray
-  }
 }
 
-case class SpecificInputPartition(
-    i: Array[Int],
-    j: Array[Int]) extends InputPartition with HasPartitionKey {
+class SpecificInputPartition(val i: Array[Int], val j: Array[Int]) extends 
InputPartition
+
+class SpecificInputPartitionWithKey(i: Array[Int], j: Array[Int])
+  extends SpecificInputPartition(i, j) with HasPartitionKey {
+  assert(i.nonEmpty)
+  assert(i.forall(i.head == _))

Review Comment:
   `i.distinct.size == 1`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:
##########
@@ -98,7 +98,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
         case Some(exprs) if KeyGroupedPartitioning.supportsExpressions(exprs) 
=>
           groupedPartitions.map { partitionValues =>
             KeyGroupedPartitioning(exprs, partitionValues.size, 
partitionValues.map(_._1))
-          }.getOrElse(super.outputPartitioning)
+          }.getOrElse(

Review Comment:
   nit: Replace `(` with `{`



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -945,98 +987,102 @@ object ColumnarReaderFactory extends 
PartitionReaderFactory {
   }
 }
 
-class PartitionAwareDataSource extends TestingV2Source {
+class MyScanBuilder(partitions: Array[InputPartition],
+                    partitionKeys: Option[Seq[String]],
+                    orderKeys: Option[Seq[String]] = None) extends 
SimpleScanBuilder
+  with SupportsReportPartitioning with SupportsReportOrdering {
 
-  class MyScanBuilder extends SimpleScanBuilder
-    with SupportsReportPartitioning {
+  override def planInputPartitions(): Array[InputPartition] = partitions
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // Note that we don't have same value of column `i` across partitions.
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
-    }
+  override def createReaderFactory(): PartitionReaderFactory = {
+    SpecificReaderFactory
+  }
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
-    }
+  override def outputPartitioning(): Partitioning =
+    partitionKeys.map(keys =>
+      new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 
partitions.length)
+    ).getOrElse(
+      new UnknownPartitioning(partitions.length)
+    )
 
-    override def outputPartitioning(): Partitioning =
-      new KeyGroupedPartitioning(Array(FieldReference("i")), 2)
-  }
+  override def outputOrdering(): Array[SortOrder] = orderKeys.map(_.map(
+    new MySortOrder(_)
+  )).getOrElse(Seq.empty).toArray
+}
 
-  override def getTable(options: CaseInsensitiveStringMap): Table = new 
SimpleBatchTable {
-    override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
-      new MyScanBuilder()
-    }
-  }
+class MySortOrder(columnName: String) extends SortOrder {
+  override def expression(): Expression = new MyIdentityTransform(
+    new MyNamedReference(columnName)
+  )
+  override def direction(): SortDirection = SortDirection.ASCENDING
+  override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
 }
 
-class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+class MyNamedReference(parts: String*) extends NamedReference {
+  override def fieldNames(): Array[String] = parts.toArray
+}
 
-  class MyScanBuilder(
-      val partitionKeys: Option[Seq[String]],
-      val orderKeys: Seq[String])
-    extends SimpleScanBuilder
-    with SupportsReportPartitioning with SupportsReportOrdering {
+class MyIdentityTransform(namedReference: NamedReference) extends Transform {
+  override def name(): String = "identity"
+  override def references(): Array[NamedReference] = Array.empty
+  override def arguments(): Array[Expression] = Seq(namedReference).toArray
+}
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // data are partitioned by column `i` or `j`, so we can report any 
partitioning
-      // column `i` is not ordered globally, but within partitions, together 
with`j`
-      // this allows us to report ordering by [i] and [i, j]
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2)))
-    }
+class PartitionAwareDataSource extends TestingV2Source {
+  // Note that we don't have same value of column `i` across partitions.
+  val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
+      new SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2))
+    )
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
+  override def getTable(options: CaseInsensitiveStringMap): Table = new 
SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+      new MyScanBuilder(partitions, Some(Seq("i")))
     }
+  }
+}
 
-    override def outputPartitioning(): Partitioning = {
-      partitionKeys.map(keys =>
-        new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 2)
-      ).getOrElse(
-        new UnknownPartitioning(2)
-      )
-    }
+class PartitionAwareDataSourceWithKey extends PartitionAwareDataSource {
+  // Note that we don't have same value of column `i` across partitions.
+  // Note that we have only a single value for column `i` per partition.
+  override val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartitionWithKey(Array(1, 1), Array(4, 4)),
+      new SpecificInputPartitionWithKey(Array(3), Array(6)),
+      new SpecificInputPartitionWithKey(Array(2), Array(6)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2))
+    )
+}
 
-    override def outputOrdering(): Array[SortOrder] = orderKeys.map(
-      new MySortOrder(_)
-    ).toArray
-  }
+class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+  // data are partitioned by column `i` or `j`, so we can report any 
partitioning
+  // column `i` is not ordered globally, but within partitions, together 
with`j`
+  // this allows us to report ordering by [i] and [i, j]
+  override val partitions: Array[InputPartition] =
+  Array(
+    new SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
+    new SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2))
+  )
 
   override def getTable(options: CaseInsensitiveStringMap): Table = new 
SimpleBatchTable {
     override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
       new MyScanBuilder(
+        partitions,
         Option(options.get("partitionKeys")).map(_.split(",")),
-        
Option(options.get("orderKeys")).map(_.split(",").toSeq).getOrElse(Seq.empty)
+        Option(options.get("orderKeys")).map(_.split(",").toSeq)
       )
     }
   }
-
-  class MySortOrder(columnName: String) extends SortOrder {
-    override def expression(): Expression = new MyIdentityTransform(
-      new MyNamedReference(columnName)
-    )
-    override def direction(): SortDirection = SortDirection.ASCENDING
-    override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
-  }
-
-  class MyNamedReference(parts: String*) extends NamedReference {
-    override def fieldNames(): Array[String] = parts.toArray
-  }
-
-  class MyIdentityTransform(namedReference: NamedReference) extends Transform {
-    override def name(): String = "identity"
-    override def references(): Array[NamedReference] = Array.empty
-    override def arguments(): Array[Expression] = Seq(namedReference).toArray
-  }
 }
 
-case class SpecificInputPartition(
-    i: Array[Int],
-    j: Array[Int]) extends InputPartition with HasPartitionKey {
+class SpecificInputPartition(val i: Array[Int], val j: Array[Int]) extends 
InputPartition
+
+class SpecificInputPartitionWithKey(i: Array[Int], j: Array[Int])
+  extends SpecificInputPartition(i, j) with HasPartitionKey {
+  assert(i.nonEmpty)
+  assert(i.forall(i.head == _))

Review Comment:
   `i.distinct.size == 1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to