This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c1b9f28682d8 [SPARK-47327][SQL] Fix thread safety issue in ICU Collator c1b9f28682d8 is described below commit c1b9f28682d8948174a36aa7690df248fcd180dd Author: Stefan Kandic <stefan.kan...@databricks.com> AuthorDate: Tue Mar 12 08:47:08 2024 +0900 [SPARK-47327][SQL] Fix thread safety issue in ICU Collator ### What changes were proposed in this pull request? Freezing the ICU collator upon creation. ### Why are the changes needed? In order to avoid multiple threads writing to the collation buffer during the generation of collation sort keys which then results in data corruption and an internal error. You can read more about collator thread safety [here](https://unicode-org.github.io/icu/userguide/icu/design.html#icu-threading-model-and-open-and-close-model) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unti test ### Was this patch authored or co-authored using generative AI tooling? no Closes #45436 from stefankandic/icuConcurrencyIssue. Authored-by: Stefan Kandic <stefan.kan...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/sql/catalyst/util/CollationFactory.java | 2 ++ .../test/scala/org/apache/spark/sql/CollationSuite.scala | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java index 151a1c9ddbbe..2940900b974a 100644 --- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java +++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java @@ -138,11 +138,13 @@ public final class CollationFactory { collationTable[2] = new Collation( "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true); collationTable[2].collator.setStrength(Collator.TERTIARY); + collationTable[2].collator.freeze(); // UNICODE case-insensitive comparison (ROOT locale, in ICU + Secondary strength). collationTable[3] = new Collation( "UNICODE_CI", Collator.getInstance(ULocale.ROOT), "153.120.0.0", false); collationTable[3].collator.setStrength(Collator.SECONDARY); + collationTable[3].collator.freeze(); for (int i = 0; i < collationTable.length; i++) { collationNameToIdMap.put(collationTable[i].collationName, i); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 2beb4b27004d..f4d91a04753a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import scala.collection.immutable.Seq +import scala.collection.parallel.CollectionConverters.ImmutableIterableIsParallelizable import scala.jdk.CollectionConverters.MapHasAsJava import org.apache.spark.SparkException @@ -438,6 +439,19 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } } + test("test concurrently generating collation keys") { + // generating ICU sort keys is not thread-safe by default so this should fail + // if we don't handle the concurrency properly on Collator level + + (0 to 10).foreach(_ => { + val collator = CollationFactory.fetchCollation("UNICODE").collator + + (0 to 100).par.foreach { _ => + collator.getCollationKey("aaa") + } + }) + } + test("text writing to parquet with collation enclosed with backticks") { withTempPath{ path => sql(s"select 'a' COLLATE `UNICODE`").write.parquet(path.getAbsolutePath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org