[1/2] spark git commit: [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader

2018-01-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 08252bb38 -> 0a441d2ed


http://git-wip-us.apache.org/repos/asf/spark/blob/0a441d2e/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index a0f5695..1acff61 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -34,11 +34,14 @@ import 
org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryWriter
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest, Trigger}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 import org.apache.spark.util.Utils
@@ -49,9 +52,11 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 
   override val streamingTimeout = 30.seconds
 
+  protected val brokerProps = Map[String, Object]()
+
   override def beforeAll(): Unit = {
 super.beforeAll()
-testUtils = new KafkaTestUtils
+testUtils = new KafkaTestUtils(brokerProps)
 testUtils.setup()
   }
 
@@ -59,18 +64,25 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 if (testUtils != null) {
   testUtils.teardown()
   testUtils = null
-  super.afterAll()
 }
+super.afterAll()
   }
 
   protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
 // Because KafkaSource's initialPartitionOffsets is set lazily, we need to 
make sure
-// its "getOffset" is called before pushing any data. Otherwise, because 
of the race contion,
+// its "getOffset" is called before pushing any data. Otherwise, because 
of the race condition,
 // we don't know which data should be fetched when `startingOffsets` is 
latest.
-q.processAllAvailable()
+q match {
+  case c: ContinuousExecution => c.awaitEpoch(0)
+  case m: MicroBatchExecution => m.processAllAvailable()
+}
 true
   }
 
+  protected def setTopicPartitions(topic: String, newCount: Int, query: 
StreamExecution) : Unit = {
+testUtils.addPartitions(topic, newCount)
+  }
+
   /**
* Add data to Kafka.
*
@@ -82,10 +94,11 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
   message: String = "",
   topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends 
AddData {
 
-override def addData(query: Option[StreamExecution]): (Source, Offset) = {
-  if (query.get.isActive) {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  query match {
 // Make sure no Spark job is running when deleting a topic
-query.get.processAllAvailable()
+case Some(m: MicroBatchExecution) => m.processAllAvailable()
+case _ =>
   }
 
   val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
@@ -97,16 +110,18 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 topicAction(existingTopicPartitions._1, 
Some(existingTopicPartitions._2))
   }
 
-  // Read all topics again in case some topics are delete.
-  val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys
   require(
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active kafka 
source")
 
   val sources = query.get.logicalPlan.collect {
-case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[KafkaSource] =>
-  source.asInstanceOf[KafkaSource]
-  }
+case StreamingExecutionRelation(source: KafkaSource, _) => source
+  } ++ (query.get.lastExecution match {
+case null => Seq()
+case e => e.logical.collect {
+  case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+}
+  })
   if (sources.isEmpty) {
 throw new Exception(
   "Could not find Kafka source in the StreamExecution logical plan to 

[1/2] spark git commit: [SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to continuous Kafka data reader

2018-01-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master a9b845ebb -> 166705785


http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index a0f5695..1acff61 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -34,11 +34,14 @@ import 
org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryWriter
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest, Trigger}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 import org.apache.spark.util.Utils
@@ -49,9 +52,11 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 
   override val streamingTimeout = 30.seconds
 
+  protected val brokerProps = Map[String, Object]()
+
   override def beforeAll(): Unit = {
 super.beforeAll()
-testUtils = new KafkaTestUtils
+testUtils = new KafkaTestUtils(brokerProps)
 testUtils.setup()
   }
 
@@ -59,18 +64,25 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 if (testUtils != null) {
   testUtils.teardown()
   testUtils = null
-  super.afterAll()
 }
+super.afterAll()
   }
 
   protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
 // Because KafkaSource's initialPartitionOffsets is set lazily, we need to 
make sure
-// its "getOffset" is called before pushing any data. Otherwise, because 
of the race contion,
+// its "getOffset" is called before pushing any data. Otherwise, because 
of the race condition,
 // we don't know which data should be fetched when `startingOffsets` is 
latest.
-q.processAllAvailable()
+q match {
+  case c: ContinuousExecution => c.awaitEpoch(0)
+  case m: MicroBatchExecution => m.processAllAvailable()
+}
 true
   }
 
+  protected def setTopicPartitions(topic: String, newCount: Int, query: 
StreamExecution) : Unit = {
+testUtils.addPartitions(topic, newCount)
+  }
+
   /**
* Add data to Kafka.
*
@@ -82,10 +94,11 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
   message: String = "",
   topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends 
AddData {
 
-override def addData(query: Option[StreamExecution]): (Source, Offset) = {
-  if (query.get.isActive) {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  query match {
 // Make sure no Spark job is running when deleting a topic
-query.get.processAllAvailable()
+case Some(m: MicroBatchExecution) => m.processAllAvailable()
+case _ =>
   }
 
   val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
@@ -97,16 +110,18 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 topicAction(existingTopicPartitions._1, 
Some(existingTopicPartitions._2))
   }
 
-  // Read all topics again in case some topics are delete.
-  val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys
   require(
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active kafka 
source")
 
   val sources = query.get.logicalPlan.collect {
-case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[KafkaSource] =>
-  source.asInstanceOf[KafkaSource]
-  }
+case StreamingExecutionRelation(source: KafkaSource, _) => source
+  } ++ (query.get.lastExecution match {
+case null => Seq()
+case e => e.logical.collect {
+  case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+}
+  })
   if (sources.isEmpty) {
 throw new Exception(
   "Could not find Kafka source in the StreamExecution logical plan to 
add