MaxGekk commented on code in PR #47154:
URL: https://github.com/apache/spark/pull/47154#discussion_r1768817088
##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##########
@@ -1736,6 +1737,40 @@ class CollationSQLExpressionsSuite
})
}
+ test("Support Mode.eval(buffer) with complex types") {
+ case class UTF8StringModeTestCase[R](
+ collationId: String,
+ bufferValues: Map[InternalRow, Long],
+ result: R)
+
+ val bufferValuesUTF8String: Map[Any, Long] = Map(
+ UTF8String.fromString("a") -> 5L,
+ UTF8String.fromString("b") -> 4L,
+ UTF8String.fromString("B") -> 3L,
+ UTF8String.fromString("d") -> 2L,
+ UTF8String.fromString("e") -> 1L)
+
+ val bufferValuesComplex = bufferValuesUTF8String.map{
+ case (k, v) => (InternalRow.fromSeq(Seq(k, k, k)), v)
+ }
+ val testCasesUTF8String = Seq(
+ UTF8StringModeTestCase("utf8_binary", bufferValuesComplex, "[a,a,a]"),
+ UTF8StringModeTestCase("UTF8_LCASE", bufferValuesComplex, "[b,b,b]"),
+ UTF8StringModeTestCase("unicode_ci", bufferValuesComplex, "[b,b,b]"),
+ UTF8StringModeTestCase("unicode", bufferValuesComplex, "[a,a,a]"))
+
+ testCasesUTF8String.foreach(t => {
Review Comment:
let's avoid unnecessary brackets:
```suggestion
testCasesUTF8String.foreach { t =>
```
##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##########
@@ -1801,44 +1810,70 @@ class CollationSQLExpressionsSuite
s"named_struct('f2', collate('$elt', '${t.collationId}')), 'f3',
1)").mkString(",")
}.mkString(",")
- val tableName = s"t_${t.collationId}_mode_nested_struct"
+ val tableName = s"t_${t.collationId}_mode_nested_struct1"
withTable(tableName) {
sql(s"CREATE TABLE ${tableName}(i STRUCT<f1: STRUCT<f2: STRING COLLATE
" +
t.collationId + ">, f3: INT>) USING parquet")
sql(s"INSERT INTO ${tableName} VALUES " + valuesToAdd)
val query = s"SELECT lower(mode(i).f1.f2) FROM ${tableName}"
- if(t.collationId == "UTF8_LCASE" ||
- t.collationId == "unicode_ci" ||
- t.collationId == "unicode") {
- // Cannot resolve "mode(i)" due to data type mismatch:
- // Input to function mode was a complex type with strings collated
on non-binary
- // collations, which is not yet supported.. SQLSTATE: 42K09; line 1
pos 13;
- val params = Seq(("sqlExpr", "\"mode(i)\""),
- ("msg", "The input to the function 'mode' " +
- "was a type of binary-unstable type that is not currently
supported by mode."),
- ("hint", "")).toMap
- checkError(
- exception = intercept[AnalysisException] {
- sql(query)
- },
- condition = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
- parameters = params,
- queryContext = Array(
- ExpectedContext(objectType = "",
- objectName = "",
- startIndex = 13,
- stopIndex = 19,
- fragment = "mode(i)")
- )
- )
- } else {
- checkAnswer(sql(query), Row(t.result))
- }
+ checkAnswer(sql(query), Row(t.result))
}
})
}
test("Support mode for string expression with collated strings in array
complex type") {
+ case class ModeTestCase[R](collationId: String, bufferValues: Map[String,
Long], result: R)
+ val testCases = Seq(
+ ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+ ModeTestCase("UTF8_LCASE", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"),
+ ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+ ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b")
+ )
+ testCases.foreach(t => {
Review Comment:
```suggestion
testCases.foreach { t =>
```
##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##########
@@ -1801,44 +1810,70 @@ class CollationSQLExpressionsSuite
s"named_struct('f2', collate('$elt', '${t.collationId}')), 'f3',
1)").mkString(",")
}.mkString(",")
- val tableName = s"t_${t.collationId}_mode_nested_struct"
+ val tableName = s"t_${t.collationId}_mode_nested_struct1"
withTable(tableName) {
sql(s"CREATE TABLE ${tableName}(i STRUCT<f1: STRUCT<f2: STRING COLLATE
" +
t.collationId + ">, f3: INT>) USING parquet")
sql(s"INSERT INTO ${tableName} VALUES " + valuesToAdd)
val query = s"SELECT lower(mode(i).f1.f2) FROM ${tableName}"
- if(t.collationId == "UTF8_LCASE" ||
- t.collationId == "unicode_ci" ||
- t.collationId == "unicode") {
- // Cannot resolve "mode(i)" due to data type mismatch:
- // Input to function mode was a complex type with strings collated
on non-binary
- // collations, which is not yet supported.. SQLSTATE: 42K09; line 1
pos 13;
- val params = Seq(("sqlExpr", "\"mode(i)\""),
- ("msg", "The input to the function 'mode' " +
- "was a type of binary-unstable type that is not currently
supported by mode."),
- ("hint", "")).toMap
- checkError(
- exception = intercept[AnalysisException] {
- sql(query)
- },
- condition = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
- parameters = params,
- queryContext = Array(
- ExpectedContext(objectType = "",
- objectName = "",
- startIndex = 13,
- stopIndex = 19,
- fragment = "mode(i)")
- )
- )
- } else {
- checkAnswer(sql(query), Row(t.result))
- }
+ checkAnswer(sql(query), Row(t.result))
}
})
}
test("Support mode for string expression with collated strings in array
complex type") {
+ case class ModeTestCase[R](collationId: String, bufferValues: Map[String,
Long], result: R)
+ val testCases = Seq(
+ ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+ ModeTestCase("UTF8_LCASE", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"),
+ ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+ ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b")
+ )
+ testCases.foreach(t => {
+ val valuesToAdd = t.bufferValues.map { case (elt, numRepeats) =>
+ (0L to numRepeats).map(_ => s"array(named_struct('f2', " +
+ s"collate('$elt', '${t.collationId}'), 'f3', 1))").mkString(",")
+ }.mkString(",")
+
+ val tableName = s"t_${t.collationId}_mode_nested_struct2"
+ withTable(tableName) {
+ sql(s"CREATE TABLE ${tableName}(" +
+ s"i ARRAY< STRUCT<f2: STRING COLLATE ${t.collationId}, f3: INT>>)" +
+ s" USING parquet")
+ sql(s"INSERT INTO ${tableName} VALUES " + valuesToAdd)
+ val query = s"SELECT lower(element_at(mode(i).f2, 1)) FROM
${tableName}"
+ checkAnswer(sql(query), Row(t.result))
+ }
+ })
+ }
+
+ test("Support mode for string expression with collated strings in 3D array
type") {
+ case class ModeTestCase[R](collationId: String, bufferValues: Map[String,
Long], result: R)
+ val testCases = Seq(
+ ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+ ModeTestCase("UTF8_LCASE", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"),
+ ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+ ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b")
+ )
+ testCases.foreach(t => {
Review Comment:
```suggestion
testCases.foreach { t =>
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##########
@@ -86,6 +91,49 @@ case class Mode(
buffer
}
+ private def getCollationAwareBuffer(
+ childDataType: DataType,
+ buffer: OpenHashMap[AnyRef, Long]): Iterable[(AnyRef, Long)] = {
+ def groupAndReduceBuffer(groupingFunction: AnyRef => _): Iterable[(AnyRef,
Long)] = {
+ buffer.groupMapReduce(t =>
+ groupingFunction(t._1))(x => x)((x, y) => (x._1, x._2 + y._2)).values
+ }
+ def determineBufferingFunction(
+ childDataType: DataType): Option[AnyRef => _] = {
+ childDataType match {
+ case _ if UnsafeRowUtils.isBinaryStable(child.dataType) => None
+ case _ => Some(collationAwareTransform(_, childDataType))
+ }
+ }
+
determineBufferingFunction(childDataType).map(groupAndReduceBuffer).getOrElse(buffer)
+ }
+
+ private def collationAwareTransform(data: AnyRef, dataType: DataType):
AnyRef = {
+ dataType match {
+ case _ if UnsafeRowUtils.isBinaryStable(dataType) => data
+ case st: StructType =>
+
processStructTypeWithBuffer(data.asInstanceOf[InternalRow].toSeq(st).zip(st.fields))
+ case at: ArrayType => processArrayTypeWithBuffer(at,
data.asInstanceOf[ArrayData])
+ case st: StringType =>
+ CollationFactory.getCollationKey(data.asInstanceOf[UTF8String],
st.collationId)
+ case _ =>
+ throw new SparkUnsupportedOperationException(
+ s"Unsupported data type for collation-aware mode: $dataType")
Review Comment:
You call here this constructor:
```
def this(errorClass: String)
```
and I am sure this one `Unsupported data type for collation-aware mode ...`
is not an error class.
##########
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##########
@@ -1852,40 +1887,69 @@ class CollationSQLExpressionsSuite
s"array(collate('$elt', '${t.collationId}'))), 'f3',
1))").mkString(",")
}.mkString(",")
- val tableName = s"t_${t.collationId}_mode_nested_struct"
+ val tableName = s"t_${t.collationId}_mode_highly_nested_struct"
withTable(tableName) {
sql(s"CREATE TABLE ${tableName}(" +
s"i ARRAY<STRUCT<s1: STRUCT<a2: ARRAY<STRING COLLATE
${t.collationId}>>, f3: INT>>)" +
s" USING parquet")
sql(s"INSERT INTO ${tableName} VALUES " + valuesToAdd)
val query = s"SELECT lower(element_at(element_at(mode(i), 1).s1.a2,
1)) FROM ${tableName}"
- if(t.collationId == "UTF8_LCASE" ||
- t.collationId == "unicode_ci" || t.collationId == "unicode") {
- val params = Seq(("sqlExpr", "\"mode(i)\""),
- ("msg", "The input to the function 'mode' was a type" +
- " of binary-unstable type that is not currently supported by
mode."),
- ("hint", "")).toMap
- checkError(
- exception = intercept[AnalysisException] {
- sql(query)
- },
- condition = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
- parameters = params,
- queryContext = Array(
- ExpectedContext(objectType = "",
- objectName = "",
- startIndex = 35,
- stopIndex = 41,
- fragment = "mode(i)")
- )
- )
- } else {
+
checkAnswer(sql(query), Row(t.result))
- }
}
})
}
+ test("Support mode expression with collated in recursively nested struct
with map with keys") {
+ case class ModeTestCase(collationId: String, bufferValues: Map[String,
Long], result: String)
+ Seq(
+ ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "{a ->
1}"),
+ ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "{a ->
1}"),
+ ModeTestCase("utf8_lcase", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "{b ->
1}"),
+ ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "{b ->
1}")
+ ).foreach(t1 => {
Review Comment:
```suggestion
).foreach { t1 =>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]