[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-05-18 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r426953544



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -310,8 +310,8 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 
   test("ShuffledHashJoin metrics") {
 withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
-SQLConf.SHUFFLE_PARTITIONS.key -> "2",
-SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {

Review comment:
   nit: plz avoid unnecesary changes where possible.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416990708



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   withTempView("testDataForJoin") {
 // Assume the execution plan is
 // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
-val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON 
" +
   "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
-Seq(false, true).foreach { enableWholeStage =>
-  val df = spark.sql(query)
-  testSparkPlanMetrics(df, 2, Map(
-0L -> (("BroadcastNestedLoopJoin", Map(
-  "number of output rows" -> 12L,
-enableWholeStage
-  )
-}
+val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin 
ON " +
+  "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
+Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
+  .foreach { case (query, enableWholeStage) =>
+val df = spark.sql(query)
+testSparkPlanMetrics(df, 2, Map(
+  0L -> (("BroadcastNestedLoopJoin", Map(
+"number of output rows" -> 12L,
+  enableWholeStage
+)
+  }

Review comment:
   nit: format(wrong indents)
   ```
   Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
 .foreach { case (query, enableWholeStage) =>
 val df = spark.sql(query)
 testSparkPlanMetrics(df, 2, Map(
   0L -> (("BroadcastNestedLoopJoin", Map(
 "number of output rows" -> 12L,
   enableWholeStage
 )
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416990708



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   withTempView("testDataForJoin") {
 // Assume the execution plan is
 // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
-val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON 
" +
   "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
-Seq(false, true).foreach { enableWholeStage =>
-  val df = spark.sql(query)
-  testSparkPlanMetrics(df, 2, Map(
-0L -> (("BroadcastNestedLoopJoin", Map(
-  "number of output rows" -> 12L,
-enableWholeStage
-  )
-}
+val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin 
ON " +
+  "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
+Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
+  .foreach { case (query, enableWholeStage) =>
+val df = spark.sql(query)
+testSparkPlanMetrics(df, 2, Map(
+  0L -> (("BroadcastNestedLoopJoin", Map(
+"number of output rows" -> 12L,
+  enableWholeStage
+)
+  }

Review comment:
   nit: format(wrong indents)
   ```
   Seq((leftQuery, false),
 (rightQuery, false),
 (leftQuery, true),
 (rightQuery, true)).foreach { case (query, enableWholeStage) =>
 val df = spark.sql(query)
 testSparkPlanMetrics(df, 2, Map(
   0L -> (("BroadcastNestedLoopJoin", Map(
 "number of output rows" -> 12L,
   enableWholeStage
 )
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416990708



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   withTempView("testDataForJoin") {
 // Assume the execution plan is
 // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
-val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON 
" +
   "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
-Seq(false, true).foreach { enableWholeStage =>
-  val df = spark.sql(query)
-  testSparkPlanMetrics(df, 2, Map(
-0L -> (("BroadcastNestedLoopJoin", Map(
-  "number of output rows" -> 12L,
-enableWholeStage
-  )
-}
+val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin 
ON " +
+  "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
+Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
+  .foreach { case (query, enableWholeStage) =>
+val df = spark.sql(query)
+testSparkPlanMetrics(df, 2, Map(
+  0L -> (("BroadcastNestedLoopJoin", Map(
+"number of output rows" -> 12L,
+  enableWholeStage
+)
+  }

Review comment:
   nit: format(wrong indents)
   ```
   Seq(
 (leftQuery, false),
 (rightQuery, false),
 (leftQuery, true),
 (rightQuery, true)).foreach { case (query, enableWholeStage) =>
 val df = spark.sql(query)
 testSparkPlanMetrics(df, 2, Map(
   0L -> (("BroadcastNestedLoopJoin", Map(
 "number of output rows" -> 12L,
   enableWholeStage
 )
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416217721



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -394,6 +420,21 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("BroadcastLeftAntiJoinHash metrics") {
+val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+// Assume the execution plan is
+// ... -> BroadcastHashJoin(nodeId = 1)

Review comment:
   Need this comment?  I think the code below is clear without this comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416217018



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", 
"value")
+  Seq((0L, "right_outer", leftDf, rightDf, 10L, false),
+(0L, "left_outer", rightDf, leftDf, 10L, false),
+(0L, "right_outer", leftDf, rightDf, 10L, true),
+(0L, "left_outer", rightDf, leftDf, 10L, true),
+(2L, "left_anti", rightDf, leftDf, 8L, true),
+(2L, "left_semi", rightDf, leftDf, 2L, true),
+(1L, "left_anti", rightDf, leftDf, 8L, false),
+(1L, "left_semi", rightDf, leftDf, 2L, false))
+.foreach { case (nodeId, joinType, leftDf, rightDf, rows, 
enableWholeStage) =>
+  val df = leftDf.hint("shuffle_hash").join(
+rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
+  testSparkPlanMetrics(df, 1, Map(
+nodeId -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> rows,
+enableWholeStage
+  )
+}
+}
+  }
+
   test("BroadcastHashJoin(outer) metrics") {
 val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
 val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
 // Assume the execution plan is
 // ... -> BroadcastHashJoin(nodeId = 0)
-Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
-  ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
-  case (joinType, nodeId, numRows, enableWholeStage) =>
+Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), 
("left_outer", 1L, 5L, true),
+  ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, 
numRows, enableWholeStage) =>
   val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
   testSparkPlanMetrics(df, 2, Map(
-nodeId -> (("BroadcastHashJoin", Map(
-  "number of output rows" -> numRows,
+nodeId -> (("BroadcastHashJoin", Map("number of output rows" -> 
numRows,

Review comment:
   unnecessary changes?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416216579



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",

Review comment:
   Still needs this setting, even though the hint used?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416216164



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {

Review comment:
   nit: `(left, outer)`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416215945



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage

Review comment:
   unnecessary changes?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415434701



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,36 +325,61 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", 
"value")
+  Seq((0L, "right_outer", leftDf, rightDf, 10L, false),
+(0L, "left_outer", rightDf, leftDf, 10L, false),
+(0L, "right_outer", leftDf, rightDf, 10L, true),
+(0L, "left_outer", rightDf, leftDf, 10L, true),
+(2L, "left_anti", rightDf, leftDf, 8L, true),
+(2L, "left_semi", rightDf, leftDf, 2L, true),
+(1L, "left_anti", rightDf, leftDf, 8L, false),
+(1L, "left_semi", rightDf, leftDf, 2L, false))
+.foreach { case (nodeId, joinType, leftDf, rightDf, rows, 
enableWholeStage) =>
+  val df = leftDf.join(rightDf.hint("shuffle_hash"), $"key" === 
$"key2", joinType)
+  testSparkPlanMetrics(df, 1, Map(
+nodeId -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> rows,
+enableWholeStage
+  )
+}
+}
+  }
+
   test("BroadcastHashJoin(outer) metrics") {
 val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
 val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
 // Assume the execution plan is
 // ... -> BroadcastHashJoin(nodeId = 0)
-Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
-  ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
+Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), 
("left_outer", 1L, 5L, true),
+  ("right_outer", 1L, 6L, true)).foreach {
   case (joinType, nodeId, numRows, enableWholeStage) =>
-  val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
-  testSparkPlanMetrics(df, 2, Map(
-nodeId -> (("BroadcastHashJoin", Map(
-  "number of output rows" -> numRows,
-enableWholeStage
-  )
+val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
+testSparkPlanMetrics(df, 2, Map(
+  nodeId -> (("BroadcastHashJoin", Map(
+"number of output rows" -> numRows,
+  enableWholeStage
+)

Review comment:
   nit: wrong indents





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415434617



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -547,9 +590,9 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 
   test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input 
when not necessary") {
 def checkFilterAndRangeMetrics(
-df: DataFrame,
-filterNumOutputs: Int,
-rangeNumOutputs: Int): Unit = {
+df: DataFrame,
+filterNumOutputs: Int,
+rangeNumOutputs: Int): Unit = {

Review comment:
   plz revert this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415434556



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",

Review comment:
   ditto: https://github.com/apache/spark/pull/28330#discussion_r415434157





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415434157



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",

Review comment:
   How about using a hint to control join physical plans?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415434157



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",

Review comment:
   How about using hint?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-25 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415179658



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, 
false),
+("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, 
true))
+.foreach { case (joinType, nodeId, df1, df2, enableWholeStage) =>
+  val df = df1.join(df2, $"key" === $"key2", joinType)
+  testSparkPlanMetrics(df, 1, Map(
+nodeId -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 10L,
+enableWholeStage
+  )
+}
+}
+  }
+
+  test("ShuffledHashJoin(left-anti) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq((2L, true), (1L, false)).foreach { case (nodeId, enableWholeStage) =>
+val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", 
"left_anti")
+testSparkPlanMetrics(df, 1, Map(
+  nodeId -> (("ShuffledHashJoin", Map(
+"number of output rows" -> 8L,
+  enableWholeStage
+)
+  }
+}
+  }
+
+  test("ShuffledHashJoin(left-semi) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) =>
+val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", 
"left_semi")
+testSparkPlanMetrics(df, 1, Map(
+  nodeId -> (("ShuffledHashJoin", Map(
+"number of output rows" -> 2L,

Review comment:
   We need to split this join test into three parts? It seems the only 
metric value is different between them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-25 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415178719



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",

Review comment:
   We need to set this value for the test?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-25 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415178596



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, 
false),
+("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, 
true))
+.foreach { case (joinType, nodeId, df1, df2, enableWholeStage) =>

Review comment:
   nit: `df1` -> `leftDf` and `df2` -> `rightDf`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-25 Thread GitBox


maropu commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415178386



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",

Review comment:
   `40` -> `-1`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org