HyukjinKwon commented on a change in pull request #24996:
URL: https://github.com/apache/spark/pull/24996#discussion_r437126007



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
##########
@@ -17,13 +17,94 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.CalendarInterval
+
+private object Triggers {
+  def validate(intervalMs: Long): Unit = {
+    require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
+    if (cal.months > 0) {
+      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+    }
+    TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
 
 /**
  * A [[Trigger]] that processes only one batch of data in a streaming query 
then terminates
  * the query.
  */
 @Experimental
 @Evolving

Review comment:
       Shall we remove the annotations? it's `private` but the annotations say 
it's an API.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
##########
@@ -17,13 +17,94 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.CalendarInterval
+
+private object Triggers {
+  def validate(intervalMs: Long): Unit = {
+    require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
+    if (cal.months > 0) {
+      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+    }
+    TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
 
 /**
  * A [[Trigger]] that processes only one batch of data in a streaming query 
then terminates
  * the query.
  */
 @Experimental
 @Evolving
-case object OneTimeTrigger extends Trigger
+private[sql] case object OneTimeTrigger extends Trigger

Review comment:
       Also, let's don't have `private[sql]` since execution package is already 
private per SPARK-16964




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to