Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


uros-db commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1604352238


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteCollationJoin.scala:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
CollationKey, EqualNullSafe, EqualTo, Lower}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.StringType
+
+object RewriteCollationJoin extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
+case j @ Join(_, _, _, Some(condition), _) =>
+  val newCondition = condition transform {
+case EqualTo(l: AttributeReference, r: AttributeReference) =>
+  (l.dataType, r.dataType) match {
+case (st: StringType, _: StringType) =>

Review Comment:
   edit: updated PR title, description, and corresponding Jira ticket title to 
reflect this
   also, created new ticket to handle complex types (ArrayType, StructType, 
etc.) https://issues.apache.org/jira/browse/SPARK-48318



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


uros-db commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1604346112


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteCollationJoin.scala:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
CollationKey, EqualNullSafe, EqualTo, Lower}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.StringType
+
+object RewriteCollationJoin extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
+case j @ Join(_, _, _, Some(condition), _) =>
+  val newCondition = condition transform {
+case EqualTo(l: AttributeReference, r: AttributeReference) =>
+  (l.dataType, r.dataType) match {
+case (st: StringType, _: StringType) =>

Review Comment:
   will add support for complex types in a future PR, for now - only String 
should be supported



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


cloud-fan commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1603420522


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteCollationJoin.scala:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
CollationKey, EqualNullSafe, EqualTo, Lower}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.StringType
+
+object RewriteCollationJoin extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
+case j @ Join(_, _, _, Some(condition), _) =>
+  val newCondition = condition transform {
+case EqualTo(l: AttributeReference, r: AttributeReference) =>
+  (l.dataType, r.dataType) match {
+case (st: StringType, _: StringType) =>

Review Comment:
   how about nested string type such as array of string, struct of string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1603381135


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -1051,6 +1052,153 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
 }
   }
 
+  test("CollationKey generates correct collation key for string") {

Review Comment:
   I think that these tests should be in `CollationExpressionSuite`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1603377917


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -784,19 +785,19 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
   case _: BroadcastHashJoinExec => ()
 }.nonEmpty)
 
-// Even with hint broadcast, hash join is not used for non-binary collated 
strings.
+// Hash join is also used for non-binary collated strings.

Review Comment:
   We should assert that collationkey was injected into physical plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1603376505


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -784,19 +785,19 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
   case _: BroadcastHashJoinExec => ()
 }.nonEmpty)
 
-// Even with hint broadcast, hash join is not used for non-binary collated 
strings.
+// Hash join is also used for non-binary collated strings.

Review Comment:
   I think that you should rename the test. It's currently called `test("hash 
based joins not allowed for non-binary collated strings")` which is no longer 
true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1603374449


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala:
##
@@ -397,7 +398,11 @@ trait JoinSelectionHelper extends Logging {
 
   protected def hashJoinSupported
   (leftKeys: Seq[Expression], rightKeys: Seq[Expression]): Boolean = {
-val result = leftKeys.concat(rightKeys).forall(e => 
UnsafeRowUtils.isBinaryStable(e.dataType))
+def hasStableCollationKey(e: Expression): Boolean =
+  e.dataType.isInstanceOf[StringType] || 
e.dataType.isInstanceOf[BinaryType]
+def isHashJoinSupported(e: Expression): Boolean = 
UnsafeRowUtils.isBinaryStable(e.dataType) ||

Review Comment:
   Why do you need this `hasStableCollationKey`? `isBinaryStable` will return 
true for binary, so things should work just work, right?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-16 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1603367540


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationKey.scala:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.internal.types.StringTypeAnyCollation
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class CollationKey(expr: Expression) extends UnaryExpression with 
ExpectsInputTypes {
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
+  override def dataType: DataType = BinaryType
+
+  final lazy val collationId: Int = expr.dataType match {
+case st: StringType =>
+  st.collationId
+  }
+
+  override def nullSafeEval(input: Any): Any = input match {
+case str: UTF8String =>
+  CollationFactory.getCollationKeyBytes(str, collationId)
+case _ =>
+  None
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+val collation = CollationFactory.fetchCollation(collationId)
+if (collation.supportsBinaryEquality) {
+  defineCodeGen(ctx, ev, c => s"$c.getBytes()")
+} else if (collation.supportsLowercaseEquality) {
+  defineCodeGen(ctx, ev, c => s"$c.toLowerCase().getBytes()")
+} else {
+  defineCodeGen(ctx, ev, c => s"CollationFactory.fetchCollation" +

Review Comment:
   You can use `getCollationKeyBytes` here?
   Also, IMO, you can just remove `getCollationKeyBytes` from collation factory 
and add the logic into `nullSafeEval`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org