[GitHub] [spark] HyukjinKwon commented on a change in pull request #24996: [SPARK-28199][SS] Move Trigger implementations to Triggers.scala and avoid exposing these to the end users

2020-06-10 Thread GitBox


HyukjinKwon commented on a change in pull request #24996:
URL: https://github.com/apache/spark/pull/24996#discussion_r438483321



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
##
@@ -17,13 +17,94 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.CalendarInterval
+
+private object Triggers {
+  def validate(intervalMs: Long): Unit = {
+require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+val cal = CalendarInterval.fromCaseInsensitiveString(interval)
+if (cal.months > 0) {
+  throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+}
+TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
 
 /**
  * A [[Trigger]] that processes only one batch of data in a streaming query 
then terminates
  * the query.
  */
 @Experimental
 @Evolving

Review comment:
   Ah, sure. Thanks :D.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24996: [SPARK-28199][SS] Move Trigger implementations to Triggers.scala and avoid exposing these to the end users

2020-06-09 Thread GitBox


HyukjinKwon commented on a change in pull request #24996:
URL: https://github.com/apache/spark/pull/24996#discussion_r437126007



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
##
@@ -17,13 +17,94 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.CalendarInterval
+
+private object Triggers {
+  def validate(intervalMs: Long): Unit = {
+require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+val cal = CalendarInterval.fromCaseInsensitiveString(interval)
+if (cal.months > 0) {
+  throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+}
+TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
 
 /**
  * A [[Trigger]] that processes only one batch of data in a streaming query 
then terminates
  * the query.
  */
 @Experimental
 @Evolving

Review comment:
   Shall we remove the annotations? it's `private` but the annotations say 
it's an API.

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
##
@@ -17,13 +17,94 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.CalendarInterval
+
+private object Triggers {
+  def validate(intervalMs: Long): Unit = {
+require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+val cal = CalendarInterval.fromCaseInsensitiveString(interval)
+if (cal.months > 0) {
+  throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+}
+TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
 
 /**
  * A [[Trigger]] that processes only one batch of data in a streaming query 
then terminates
  * the query.
  */
 @Experimental
 @Evolving
-case object OneTimeTrigger extends Trigger
+private[sql] case object OneTimeTrigger extends Trigger

Review comment:
   Also, let's don't have `private[sql]` since execution package is already 
private per SPARK-16964





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org