This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1b9f28682d8 [SPARK-47327][SQL] Fix thread safety issue in ICU Collator
c1b9f28682d8 is described below

commit c1b9f28682d8948174a36aa7690df248fcd180dd
Author: Stefan Kandic <stefan.kan...@databricks.com>
AuthorDate: Tue Mar 12 08:47:08 2024 +0900

    [SPARK-47327][SQL] Fix thread safety issue in ICU Collator
    
    ### What changes were proposed in this pull request?
    
    Freezing the ICU collator upon creation.
    
    ### Why are the changes needed?
    
    In order to avoid multiple threads writing to the collation buffer during 
the generation of collation sort keys which then results in data corruption and 
an internal error.
    You can read more about collator thread safety 
[here](https://unicode-org.github.io/icu/userguide/icu/design.html#icu-threading-model-and-open-and-close-model)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unti test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45436 from stefankandic/icuConcurrencyIssue.
    
    Authored-by: Stefan Kandic <stefan.kan...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/sql/catalyst/util/CollationFactory.java   |  2 ++
 .../test/scala/org/apache/spark/sql/CollationSuite.scala   | 14 ++++++++++++++
 2 files changed, 16 insertions(+)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
index 151a1c9ddbbe..2940900b974a 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
@@ -138,11 +138,13 @@ public final class CollationFactory {
     collationTable[2] = new Collation(
       "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
     collationTable[2].collator.setStrength(Collator.TERTIARY);
+    collationTable[2].collator.freeze();
 
     // UNICODE case-insensitive comparison (ROOT locale, in ICU + Secondary 
strength).
     collationTable[3] = new Collation(
       "UNICODE_CI", Collator.getInstance(ULocale.ROOT), "153.120.0.0", false);
     collationTable[3].collator.setStrength(Collator.SECONDARY);
+    collationTable[3].collator.freeze();
 
     for (int i = 0; i < collationTable.length; i++) {
       collationNameToIdMap.put(collationTable[i].collationName, i);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index 2beb4b27004d..f4d91a04753a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import scala.collection.immutable.Seq
+import 
scala.collection.parallel.CollectionConverters.ImmutableIterableIsParallelizable
 import scala.jdk.CollectionConverters.MapHasAsJava
 
 import org.apache.spark.SparkException
@@ -438,6 +439,19 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("test concurrently generating collation keys") {
+    // generating ICU sort keys is not thread-safe by default so this should 
fail
+    // if we don't handle the concurrency properly on Collator level
+
+    (0 to 10).foreach(_ => {
+      val collator = CollationFactory.fetchCollation("UNICODE").collator
+
+      (0 to 100).par.foreach { _ =>
+        collator.getCollationKey("aaa")
+      }
+    })
+  }
+
   test("text writing to parquet with collation enclosed with backticks") {
     withTempPath{ path =>
       sql(s"select 'a' COLLATE `UNICODE`").write.parquet(path.getAbsolutePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to