Repository: spark
Updated Branches:
  refs/heads/master efef55388 -> d0bc3ed67


[SPARK-24896][SQL] Uuid should produce different values for each execution in 
streaming query

## What changes were proposed in this pull request?

`Uuid`'s results depend on random seed given during analysis. Thus under 
streaming query, we will have the same uuids in each execution. This seems to 
be incorrect for streaming query execution.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #21854 from viirya/uuid_in_streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0bc3ed6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0bc3ed6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0bc3ed6

Branch: refs/heads/master
Commit: d0bc3ed6797e0c06f688b7b2ef6c26282a25b175
Parents: efef553
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Thu Aug 2 15:35:46 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Thu Aug 2 15:35:46 2018 -0700

----------------------------------------------------------------------
 .../streaming/IncrementalExecution.scala        |  8 ++++++-
 .../sql/streaming/StreamingQuerySuite.scala     | 22 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 6ae7f28..e9ffe12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.util.Random
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
+import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, HashPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -73,10 +75,14 @@ class IncrementalExecution(
    * with the desired literal
    */
   override lazy val optimizedPlan: LogicalPlan = {
+    val random = new Random()
+
     sparkSession.sessionState.optimizer.execute(withCachedData) 
transformAllExpressions {
       case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
         logInfo(s"Current batch timestamp = $timestamp")
         ts.toLiteral
+      // SPARK-24896: Set the seed for random number generation in Uuid 
expressions.
+      case _: Uuid => Uuid(Some(random.nextLong()))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 78199b0..f37f368 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -21,6 +21,8 @@ import java.{util => ju}
 import java.util.Optional
 import java.util.concurrent.CountDownLatch
 
+import scala.collection.mutable
+
 import org.apache.commons.lang3.RandomStringUtils
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
@@ -29,8 +31,9 @@ import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Uuid
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
@@ -834,6 +837,23 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       CheckLastBatch(("A", 1)))
   }
 
+  test("Uuid in streaming query should not produce same uuids in each 
execution") {
+    val uuids = mutable.ArrayBuffer[String]()
+    def collectUuid: Seq[Row] => Unit = { rows: Seq[Row] =>
+      rows.foreach(r => uuids += r.getString(0))
+    }
+
+    val stream = MemoryStream[Int]
+    val df = stream.toDF().select(new Column(Uuid()))
+    testStream(df)(
+      AddData(stream, 1),
+      CheckAnswer(collectUuid),
+      AddData(stream, 2),
+      CheckAnswer(collectUuid)
+    )
+    assert(uuids.distinct.size == 2)
+  }
+
   
test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON
 " +
     "should not fail") {
     val df = spark.readStream.format("rate").load()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to