Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20828#discussion_r179594450
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
    @@ -43,8 +45,39 @@ object MemoryStream {
       protected val currentBlockId = new AtomicInteger(0)
       protected val memoryStreamId = new AtomicInteger(0)
     
    -  def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] 
=
    -    new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
    +  def apply[A : Encoder](
    +      implicit sqlContext: SQLContext,
    +      trigger: Trigger = Trigger.ProcessingTime(0)): MemoryStreamBase[A] = 
trigger match {
    +    case _: ContinuousTrigger =>
    +      new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext)
    +    case _ =>
    +      new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
    +  }
    +}
    +
    +abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) 
extends BaseStreamingSource {
    --- End diff --
    
    Add docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to