Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179594450 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -43,8 +45,39 @@ object MemoryStream { protected val currentBlockId = new AtomicInteger(0) protected val memoryStreamId = new AtomicInteger(0) - def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] = - new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext) + def apply[A : Encoder]( + implicit sqlContext: SQLContext, + trigger: Trigger = Trigger.ProcessingTime(0)): MemoryStreamBase[A] = trigger match { + case _: ContinuousTrigger => + new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext) + case _ => + new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext) + } +} + +abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends BaseStreamingSource { --- End diff -- Add docs.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org