Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9453#discussion_r44103334
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -375,53 +375,135 @@ class CachedTableSuite extends QueryTest with
SharedSQLContext {
sql("SELECT key, count(*) FROM orderedTable GROUP BY key ORDER BY
key"),
sql("SELECT key, count(*) FROM testData3x GROUP BY key ORDER BY
key").collect())
sqlContext.uncacheTable("orderedTable")
+ sqlContext.dropTempTable("orderedTable")
// Set up two tables distributed in the same way. Try this with the
data distributed into
// different number of partitions.
for (numPartitions <- 1 until 10 by 4) {
- testData.repartition(numPartitions, $"key").registerTempTable("t1")
- testData2.repartition(numPartitions, $"a").registerTempTable("t2")
+ withTempTable("t1", "t2") {
+ testData.repartition(numPartitions, $"key").registerTempTable("t1")
+ testData2.repartition(numPartitions, $"a").registerTempTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
+
+ // Joining them should result in no exchanges.
+ verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key =
t2.a"), 0)
+ checkAnswer(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a"),
+ sql("SELECT * FROM testData t1 JOIN testData2 t2 ON t1.key =
t2.a"))
+
+ // Grouping on the partition key should result in no exchanges
+ verifyNumExchanges(sql("SELECT count(*) FROM t1 GROUP BY key"), 0)
+ checkAnswer(sql("SELECT count(*) FROM t1 GROUP BY key"),
+ sql("SELECT count(*) FROM testData GROUP BY key"))
+
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+ }
+ }
+
+ // Distribute the tables into non-matching number of partitions. Need
to shuffle one side.
+ withTempTable("t1", "t2") {
+ testData.repartition(6, $"key").registerTempTable("t1")
+ testData2.repartition(3, $"a").registerTempTable("t2")
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")
- // Joining them should result in no exchanges.
- verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key =
t2.a"), 0)
- checkAnswer(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a"),
- sql("SELECT * FROM testData t1 JOIN testData2 t2 ON t1.key =
t2.a"))
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON
t1.key = t2.a")
+ verifyNumExchanges(query, 1)
+
assert(query.queryExecution.executedPlan.outputPartitioning.numPartitions === 6)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+ }
- // Grouping on the partition key should result in no exchanges
- verifyNumExchanges(sql("SELECT count(*) FROM t1 GROUP BY key"), 0)
- checkAnswer(sql("SELECT count(*) FROM t1 GROUP BY key"),
- sql("SELECT count(*) FROM testData GROUP BY key"))
+ // One side of join is not partitioned in the desired way. Need to
shuffle one side.
+ withTempTable("t1", "t2") {
+ testData.repartition(6, $"value").registerTempTable("t1")
+ testData2.repartition(6, $"a").registerTempTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON
t1.key = t2.a")
+ verifyNumExchanges(query, 1)
+
assert(query.queryExecution.executedPlan.outputPartitioning.numPartitions === 6)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
sqlContext.uncacheTable("t1")
sqlContext.uncacheTable("t2")
- sqlContext.dropTempTable("t1")
- sqlContext.dropTempTable("t2")
}
- // Distribute the tables into non-matching number of partitions. Need
to shuffle.
- testData.repartition(6, $"key").registerTempTable("t1")
- testData2.repartition(3, $"a").registerTempTable("t2")
- sqlContext.cacheTable("t1")
- sqlContext.cacheTable("t2")
+ withTempTable("t1", "t2") {
+ testData.repartition(6, $"value").registerTempTable("t1")
+ testData2.repartition(12, $"a").registerTempTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
- verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key =
t2.a"), 2)
- sqlContext.uncacheTable("t1")
- sqlContext.uncacheTable("t2")
- sqlContext.dropTempTable("t1")
- sqlContext.dropTempTable("t2")
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON
t1.key = t2.a")
+ verifyNumExchanges(query, 1)
+
assert(query.queryExecution.executedPlan.outputPartitioning.numPartitions ===
12)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+ }
- // One side of join is not partitioned in the desired way. Need to
shuffle.
- testData.repartition(6, $"value").registerTempTable("t1")
- testData2.repartition(6, $"a").registerTempTable("t2")
- sqlContext.cacheTable("t1")
- sqlContext.cacheTable("t2")
+ // One side of join is not partitioned in the desired way. Since the
number of partitions of
+ // the side that has already partitioned is smaller than the side that
is not partitioned,
+ // we shuffle both side.
+ withTempTable("t1", "t2") {
+ testData.repartition(6, $"value").registerTempTable("t1")
+ testData2.repartition(3, $"a").registerTempTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
- verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key =
t2.a"), 2)
- sqlContext.uncacheTable("t1")
- sqlContext.uncacheTable("t2")
- sqlContext.dropTempTable("t1")
- sqlContext.dropTempTable("t2")
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON
t1.key = t2.a")
+ verifyNumExchanges(query, 2)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+ }
+
+ // repartition's column ordering is different from group by column
ordering.
+ // But they use the same set of columns.
+ withTempTable("t1") {
+ testData.repartition(6, $"value", $"key").registerTempTable("t1")
+ sqlContext.cacheTable("t1")
+
+ val query = sql("SELECT value, key from t1 group by key, value")
+ verifyNumExchanges(query, 0)
+ checkAnswer(
+ query,
+ testData.distinct().select($"value", $"key"))
+ sqlContext.uncacheTable("t1")
+ }
+
+ // repartition's column ordering is different from join condition's
column ordering.
+ // We will still shuffle because hashcodes of a row depend on the
column ordering.
+ // If we do not shuffle, we may actually partition two tables in
totally two different way.
+ // See PartitioningSuite for more details.
+ withTempTable("t1", "t2") {
+ val df1 = testData
+ df1.repartition(6, $"value", $"key").registerTempTable("t1")
+ val df2 = testData2.select($"a", $"b".cast("string"))
+ df2.repartition(6, $"a", $"b").registerTempTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
+
+ val query =
+ sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key =
t2.a and t1.value = t2.b")
+ verifyNumExchanges(query, 1)
+
assert(query.queryExecution.executedPlan.outputPartitioning.numPartitions === 6)
+ checkAnswer(
+ query,
+ df1.join(df2, $"key" === $"a" && $"value" === $"b").select($"key",
$"value", $"a", $"b"))
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+ }
--- End diff --
@nongli I added two test cases.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]