Repository: spark
Updated Branches:
  refs/heads/master 55f36641f -> e75488718


http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 4980b0c..3d21bc6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
       case s: ContinuousExecution =>
         assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure 
query is initialized")
         val reader = s.lastExecution.executedPlan.collectFirst {
-          case DataSourceV2ScanExec(_, _, _, _, r: RateStreamContinuousReader) 
=> r
+          case DataSourceV2ScanExec(_, _, _, _, r: 
RateStreamContinuousReadSupport, _) => r
         }.get
 
         val deltaMs = numTriggers * 1000 + 300

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index 82836dc..3c973d8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -27,9 +27,9 @@ import org.apache.spark._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.LocalSparkSession
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, 
PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.test.TestSparkSession
 
 class EpochCoordinatorSuite
@@ -40,20 +40,20 @@ class EpochCoordinatorSuite
 
   private var epochCoordinator: RpcEndpointRef = _
 
-  private var writer: StreamWriter = _
+  private var writeSupport: StreamingWriteSupport = _
   private var query: ContinuousExecution = _
   private var orderVerifier: InOrder = _
 
   override def beforeEach(): Unit = {
-    val reader = mock[ContinuousReader]
-    writer = mock[StreamWriter]
+    val reader = mock[ContinuousReadSupport]
+    writeSupport = mock[StreamingWriteSupport]
     query = mock[ContinuousExecution]
-    orderVerifier = inOrder(writer, query)
+    orderVerifier = inOrder(writeSupport, query)
 
     spark = new TestSparkSession()
 
     epochCoordinator
-      = EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, 
SparkEnv.get)
+      = EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, 
spark, SparkEnv.get)
   }
 
   test("single epoch") {
@@ -209,12 +209,12 @@ class EpochCoordinatorSuite
   }
 
   private def verifyCommit(epoch: Long): Unit = {
-    orderVerifier.verify(writer).commit(eqTo(epoch), any())
+    orderVerifier.verify(writeSupport).commit(eqTo(epoch), any())
     orderVerifier.verify(query).commit(epoch)
   }
 
   private def verifyNoCommitFor(epoch: Long): Unit = {
-    verify(writer, never()).commit(eqTo(epoch), any())
+    verify(writeSupport, never()).commit(eqTo(epoch), any())
     verify(query, never()).commit(epoch)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 52b833a..aeef4c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -17,73 +17,74 @@
 
 package org.apache.spark.sql.streaming.sources
 
-import java.util.Optional
-
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, 
StreamingQueryWrapper}
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.reader.InputPartition
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader, Offset, PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
PartitionReaderFactory, ScanConfig, ScanConfigBuilder}
+import org.apache.spark.sql.sources.v2.reader.streaming._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-case class FakeReader() extends MicroBatchReader with ContinuousReader {
-  def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {}
-  def getStartOffset: Offset = RateStreamOffset(Map())
-  def getEndOffset: Offset = RateStreamOffset(Map())
-  def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
-  def commit(end: Offset): Unit = {}
-  def readSchema(): StructType = StructType(Seq())
-  def stop(): Unit = {}
-  def mergeOffsets(offsets: Array[PartitionOffset]): Offset = 
RateStreamOffset(Map())
-  def setStartOffset(start: Optional[Offset]): Unit = {}
-
-  def planInputPartitions(): java.util.ArrayList[InputPartition[InternalRow]] 
= {
+case class FakeReadSupport() extends MicroBatchReadSupport with 
ContinuousReadSupport {
+  override def deserializeOffset(json: String): Offset = 
RateStreamOffset(Map())
+  override def commit(end: Offset): Unit = {}
+  override def stop(): Unit = {}
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = 
RateStreamOffset(Map())
+  override def fullSchema(): StructType = StructType(Seq())
+  override def newScanConfigBuilder(start: Offset, end: Offset): 
ScanConfigBuilder = null
+  override def initialOffset(): Offset = RateStreamOffset(Map())
+  override def latestOffset(): Offset = RateStreamOffset(Map())
+  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
+  override def createReaderFactory(config: ScanConfig): PartitionReaderFactory 
= {
+    throw new IllegalStateException("fake source - cannot actually read")
+  }
+  override def createContinuousReaderFactory(
+      config: ScanConfig): ContinuousPartitionReaderFactory = {
+    throw new IllegalStateException("fake source - cannot actually read")
+  }
+  override def planInputPartitions(config: ScanConfig): Array[InputPartition] 
= {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }
 
-trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
-  override def createMicroBatchReader(
-      schema: Optional[StructType],
+trait FakeMicroBatchReadSupportProvider extends MicroBatchReadSupportProvider {
+  override def createMicroBatchReadSupport(
       checkpointLocation: String,
-      options: DataSourceOptions): MicroBatchReader = FakeReader()
+      options: DataSourceOptions): MicroBatchReadSupport = FakeReadSupport()
 }
 
-trait FakeContinuousReadSupport extends ContinuousReadSupport {
-  override def createContinuousReader(
-      schema: Optional[StructType],
+trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
+  override def createContinuousReadSupport(
       checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReader = FakeReader()
+      options: DataSourceOptions): ContinuousReadSupport = FakeReadSupport()
 }
 
-trait FakeStreamWriteSupport extends StreamWriteSupport {
-  override def createStreamWriter(
+trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
+  override def createStreamingWriteSupport(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceOptions): StreamWriter = {
+      options: DataSourceOptions): StreamingWriteSupport = {
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }
 
-class FakeReadMicroBatchOnly extends DataSourceRegister with 
FakeMicroBatchReadSupport {
+class FakeReadMicroBatchOnly extends DataSourceRegister with 
FakeMicroBatchReadSupportProvider {
   override def shortName(): String = "fake-read-microbatch-only"
 }
 
-class FakeReadContinuousOnly extends DataSourceRegister with 
FakeContinuousReadSupport {
+class FakeReadContinuousOnly extends DataSourceRegister with 
FakeContinuousReadSupportProvider {
   override def shortName(): String = "fake-read-continuous-only"
 }
 
 class FakeReadBothModes extends DataSourceRegister
-    with FakeMicroBatchReadSupport with FakeContinuousReadSupport {
+    with FakeMicroBatchReadSupportProvider with 
FakeContinuousReadSupportProvider {
   override def shortName(): String = "fake-read-microbatch-continuous"
 }
 
@@ -91,7 +92,7 @@ class FakeReadNeitherMode extends DataSourceRegister {
   override def shortName(): String = "fake-read-neither-mode"
 }
 
-class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport {
+class FakeWriteSupportProvider extends DataSourceRegister with 
FakeStreamingWriteSupportProvider {
   override def shortName(): String = "fake-write-microbatch-continuous"
 }
 
@@ -106,8 +107,8 @@ class FakeSink extends Sink {
   override def addBatch(batchId: Long, data: DataFrame): Unit = {}
 }
 
-class FakeWriteV1Fallback extends DataSourceRegister
-  with FakeStreamWriteSupport with StreamSinkProvider {
+class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
+  with FakeStreamingWriteSupportProvider with StreamSinkProvider {
 
   override def createSink(
     sqlContext: SQLContext,
@@ -190,11 +191,11 @@ class StreamingDataSourceV2Suite extends StreamTest {
     val v2Query = testPositiveCase(
       "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once())
     assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-      .isInstanceOf[FakeWriteV1Fallback])
+      .isInstanceOf[FakeWriteSupportProviderV1Fallback])
 
     // Ensure we create a V1 sink with the config. Note the config is a comma 
separated
     // list, including other fake entries.
-    val fullSinkName = 
"org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
+    val fullSinkName = classOf[FakeWriteSupportProviderV1Fallback].getName
     withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> 
s"a,b,c,test,$fullSinkName,d,e") {
       val v1Query = testPositiveCase(
         "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once())
@@ -218,35 +219,37 @@ class StreamingDataSourceV2Suite extends StreamTest {
       val writeSource = DataSource.lookupDataSource(write, 
spark.sqlContext.conf).newInstance()
       (readSource, writeSource, trigger) match {
         // Valid microbatch queries.
-        case (_: MicroBatchReadSupport, _: StreamWriteSupport, t)
+        case (_: MicroBatchReadSupportProvider, _: 
StreamingWriteSupportProvider, t)
           if !t.isInstanceOf[ContinuousTrigger] =>
           testPositiveCase(read, write, trigger)
 
         // Valid continuous queries.
-        case (_: ContinuousReadSupport, _: StreamWriteSupport, _: 
ContinuousTrigger) =>
+        case (_: ContinuousReadSupportProvider, _: 
StreamingWriteSupportProvider,
+              _: ContinuousTrigger) =>
           testPositiveCase(read, write, trigger)
 
         // Invalid - can't read at all
         case (r, _, _)
-            if !r.isInstanceOf[MicroBatchReadSupport]
-              && !r.isInstanceOf[ContinuousReadSupport] =>
+            if !r.isInstanceOf[MicroBatchReadSupportProvider]
+              && !r.isInstanceOf[ContinuousReadSupportProvider] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support streamed reading")
 
         // Invalid - can't write
-        case (_, w, _) if !w.isInstanceOf[StreamWriteSupport] =>
+        case (_, w, _) if !w.isInstanceOf[StreamingWriteSupportProvider] =>
           testNegativeCase(read, write, trigger,
             s"Data source $write does not support streamed writing")
 
         // Invalid - trigger is continuous but reader is not
-        case (r, _: StreamWriteSupport, _: ContinuousTrigger)
-            if !r.isInstanceOf[ContinuousReadSupport] =>
+        case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
+            if !r.isInstanceOf[ContinuousReadSupportProvider] =>
           testNegativeCase(read, write, trigger,
             s"Data source $read does not support continuous processing")
 
         // Invalid - trigger is microbatch but reader is not
         case (r, _, t)
-           if !r.isInstanceOf[MicroBatchReadSupport] && 
!t.isInstanceOf[ContinuousTrigger] =>
+           if !r.isInstanceOf[MicroBatchReadSupportProvider] &&
+             !t.isInstanceOf[ContinuousTrigger] =>
           testPostCreationNegativeCase(read, write, trigger,
             s"Data source $read does not support microbatch processing")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to