xuanyuanking commented on a change in pull request #29724:
URL: https://github.com/apache/spark/pull/29724#discussion_r487229420



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Nich catch,
   nit: let's remove the duplicate error string in 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR168
 and 
https://github.com/apache/spark/pull/29724/files#diff-e9db271d8593f070ba7096e758c8b89dR162

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,13 +244,14 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftJoiner.joinWith(rightJoiner)` generates all rows from matching 
new left input with

Review comment:
       I think the original PR just wants to use pseudocode to explain, either 
way is ok to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -189,11 +189,9 @@ case class StreamingSymmetricHashJoinExec(
   override def outputPartitioning: Partitioning = joinType match {
     case _: InnerLike =>
       PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
-    case LeftOuter => PartitioningCollection(Seq(left.outputPartitioning))
-    case RightOuter => PartitioningCollection(Seq(right.outputPartitioning))
-    case x =>
-      throw new IllegalArgumentException(
-        s"${getClass.getSimpleName} should not take $x as the JoinType")
+    case LeftOuter => left.outputPartitioning
+    case RightOuter => right.outputPartitioning
+    case _ => throwBadJoinTypeException()

Review comment:
       Yes, have a string val for the same error message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to