Repository: spark
Updated Branches:
  refs/heads/master a5a4b8350 -> d6f5e172b


Revert "[SPARK-23303][SQL] improve the explain result for data source v2 
relations"

This reverts commit f17b936f0ddb7d46d1349bd42f9a64c84c06e48d.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6f5e172
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6f5e172
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6f5e172

Branch: refs/heads/master
Commit: d6f5e172b480c62165be168deae0deff8062f476
Parents: a5a4b83
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Feb 13 16:21:17 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Feb 13 16:21:17 2018 -0800

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousSourceSuite.scala   | 18 +++-
 .../sql/kafka010/KafkaContinuousTest.scala      |  3 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  3 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  8 +-
 .../datasources/v2/DataSourceReaderHolder.scala | 64 +++++++++++++
 .../datasources/v2/DataSourceV2QueryPlan.scala  | 96 --------------------
 .../datasources/v2/DataSourceV2Relation.scala   | 26 +++---
 .../datasources/v2/DataSourceV2ScanExec.scala   |  6 +-
 .../datasources/v2/DataSourceV2Strategy.scala   |  4 +-
 .../v2/PushDownOperatorsToDataSource.scala      |  4 +-
 .../streaming/MicroBatchExecution.scala         | 22 ++---
 .../continuous/ContinuousExecution.scala        |  9 +-
 .../spark/sql/streaming/StreamSuite.scala       |  8 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  2 +-
 .../streaming/continuous/ContinuousSuite.scala  | 11 ++-
 15 files changed, 127 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 72ee0c5..a7083fa 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -17,9 +17,20 @@
 
 package org.apache.spark.sql.kafka010
 
-import org.apache.spark.sql.Dataset
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 
 // Run tests in KafkaSourceSuiteBase in continuous execution mode.
 class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with 
KafkaContinuousTest
@@ -60,8 +71,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.logical.collectFirst {
-              case r: DataSourceV2Relation if 
r.reader.isInstanceOf[KafkaContinuousReader] =>
-                r.reader.asInstanceOf[KafkaContinuousReader]
+              case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
             }.exists { r =>
               // Ensure the new topic is present and the old topic is gone.
               r.knownPartitions.exists(_.topic == topic2)

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index d34458a..5a1a14f 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -47,8 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
     eventually(timeout(streamingTimeout)) {
       assert(
         query.lastExecution.logical.collectFirst {
-          case r: DataSourceV2Relation if 
r.reader.isInstanceOf[KafkaContinuousReader] =>
-            r.reader.asInstanceOf[KafkaContinuousReader]
+          case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
         }.exists(_.knownPartitions.size == newCount),
         s"query never reconfigured to $newCount partitions")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index cb09cce..02c8764 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -117,8 +117,7 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
       } ++ (query.get.lastExecution match {
         case null => Seq()
         case e => e.logical.collect {
-          case r: DataSourceV2Relation if 
r.reader.isInstanceOf[KafkaContinuousReader] =>
-            r.reader.asInstanceOf[KafkaContinuousReader]
+          case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
         }
       })
       if (sources.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 984b651..fcaf8d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -189,9 +189,11 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 
     val cls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-      val ds = cls.newInstance().asInstanceOf[DataSourceV2]
+      val ds = cls.newInstance()
       val options = new DataSourceOptions((extraOptions ++
-        DataSourceV2Utils.extractSessionConfigs(ds, 
sparkSession.sessionState.conf)).asJava)
+        DataSourceV2Utils.extractSessionConfigs(
+          ds = ds.asInstanceOf[DataSourceV2],
+          conf = sparkSession.sessionState.conf)).asJava)
 
       // Streaming also uses the data source V2 API. So it may be that the 
data source implements
       // v2, but has no v2 implementation for batch reads. In that case, we 
fall back to loading
@@ -219,7 +221,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       if (reader == null) {
         loadV1Source(paths: _*)
       } else {
-        Dataset.ofRows(sparkSession, DataSourceV2Relation(ds, reader))
+        Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
       }
     } else {
       loadV1Source(paths: _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
new file mode 100644
index 0000000..81219e9
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util.Objects
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * A base class for data source reader holder with customized equals/hashCode 
methods.
+ */
+trait DataSourceReaderHolder {
+
+  /**
+   * The output of the data source reader, w.r.t. column pruning.
+   */
+  def output: Seq[Attribute]
+
+  /**
+   * The held data source reader.
+   */
+  def reader: DataSourceReader
+
+  /**
+   * The metadata of this data source reader that can be used for equality 
test.
+   */
+  private def metadata: Seq[Any] = {
+    val filters: Any = reader match {
+      case s: SupportsPushDownCatalystFilters => 
s.pushedCatalystFilters().toSet
+      case s: SupportsPushDownFilters => s.pushedFilters().toSet
+      case _ => Nil
+    }
+    Seq(output, reader.getClass, filters)
+  }
+
+  def canEqual(other: Any): Boolean
+
+  override def equals(other: Any): Boolean = other match {
+    case other: DataSourceReaderHolder =>
+      canEqual(other) && metadata.length == other.metadata.length &&
+        metadata.zip(other.metadata).forall { case (l, r) => l == r }
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
deleted file mode 100644
index 1e0d088..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import java.util.Objects
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.DataSourceV2
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.util.Utils
-
-/**
- * A base class for data source v2 related query plan(both logical and 
physical). It defines the
- * equals/hashCode methods, and provides a string representation of the query 
plan, according to
- * some common information.
- */
-trait DataSourceV2QueryPlan {
-
-  /**
-   * The output of the data source reader, w.r.t. column pruning.
-   */
-  def output: Seq[Attribute]
-
-  /**
-   * The instance of this data source implementation. Note that we only 
consider its class in
-   * equals/hashCode, not the instance itself.
-   */
-  def source: DataSourceV2
-
-  /**
-   * The created data source reader. Here we use it to get the filters that 
has been pushed down
-   * so far, itself doesn't take part in the equals/hashCode.
-   */
-  def reader: DataSourceReader
-
-  private lazy val filters = reader match {
-    case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
-    case s: SupportsPushDownFilters => s.pushedFilters().toSet
-    case _ => Set.empty
-  }
-
-  /**
-   * The metadata of this data source query plan that can be used for equality 
check.
-   */
-  private def metadata: Seq[Any] = Seq(output, source.getClass, filters)
-
-  def canEqual(other: Any): Boolean
-
-  override def equals(other: Any): Boolean = other match {
-    case other: DataSourceV2QueryPlan => canEqual(other) && metadata == 
other.metadata
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
-  }
-
-  def metadataString: String = {
-    val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
-    if (filters.nonEmpty) entries += "PushedFilter" -> filters.mkString("[", 
", ", "]")
-
-    val outputStr = Utils.truncatedString(output, "[", ", ", "]")
-
-    val entriesStr = if (entries.nonEmpty) {
-      Utils.truncatedString(entries.map {
-        case (key, value) => key + ": " + 
StringUtils.abbreviate(redact(value), 100)
-      }, " (", ", ", ")")
-    } else {
-      ""
-    }
-
-    s"${source.getClass.getSimpleName}$outputStr$entriesStr"
-  }
-
-  private def redact(text: String): String = {
-    Utils.redact(SQLConf.get.stringRedationPattern, text)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index cd97e0c..38f6b15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -20,23 +20,15 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
 
 case class DataSourceV2Relation(
     output: Seq[AttributeReference],
-    source: DataSourceV2,
-    reader: DataSourceReader,
-    override val isStreaming: Boolean)
-  extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan {
+    reader: DataSourceReader)
+  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
 
-  override def simpleString: String = {
-    val streamingHeader = if (isStreaming) "Streaming " else ""
-    s"${streamingHeader}Relation $metadataString"
-  }
-
   override def computeStats(): Statistics = reader match {
     case r: SupportsReportStatistics =>
       Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
@@ -49,8 +41,18 @@ case class DataSourceV2Relation(
   }
 }
 
+/**
+ * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
+ * to the non-streaming relation.
+ */
+class StreamingDataSourceV2Relation(
+    output: Seq[AttributeReference],
+    reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
+  override def isStreaming: Boolean = true
+}
+
 object DataSourceV2Relation {
-  def apply(source: DataSourceV2, reader: DataSourceReader): 
DataSourceV2Relation = {
-    new DataSourceV2Relation(reader.readSchema().toAttributes, source, reader, 
isStreaming = false)
+  def apply(reader: DataSourceReader): DataSourceV2Relation = {
+    new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index c99d535..7d9581b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
 import org.apache.spark.sql.types.StructType
@@ -37,14 +36,11 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
     output: Seq[AttributeReference],
-    @transient source: DataSourceV2,
     @transient reader: DataSourceReader)
-  extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan {
+  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
 
-  override def simpleString: String = s"Scan $metadataString"
-
   override def outputPartitioning: physical.Partitioning = reader match {
     case s: SupportsReportPartitioning =>
       new DataSourcePartitioning(

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index fb61e6f..df5b524 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.SparkPlan
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case r: DataSourceV2Relation =>
-      DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil
+    case DataSourceV2Relation(output, reader) =>
+      DataSourceV2ScanExec(output, reader) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 4cfdd50..1ca6cbf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -39,11 +39,11 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
     // TODO: Ideally column pruning should be implemented via a plan property 
that is propagated
     // top-down, then we can simplify the logic here and only collect target 
operators.
     val filterPushed = plan transformUp {
-      case FilterAndProject(fields, condition, r: DataSourceV2Relation) =>
+      case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
         val (candidates, nonDeterministic) =
           splitConjunctivePredicates(condition).partition(_.deterministic)
 
-        val stayUpFilters: Seq[Expression] = r.reader match {
+        val stayUpFilters: Seq[Expression] = reader match {
           case r: SupportsPushDownCatalystFilters =>
             r.pushCatalystFilters(candidates.toArray)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 84564b6..8125333 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -27,9 +27,9 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
WriteToDataSourceV2}
 import 
org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, 
MicroBatchWriter}
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset => OffsetV2}
 import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -52,8 +52,6 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
 
-  private val readerToDataSourceMap = MutableMap.empty[MicroBatchReader, 
DataSourceV2]
-
   private val triggerExecutor = trigger match {
     case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
     case OneTimeTrigger => OneTimeExecutor()
@@ -92,7 +90,6 @@ class MicroBatchExecution(
             metadataPath,
             new DataSourceOptions(options.asJava))
           nextSourceId += 1
-          readerToDataSourceMap(reader) = source
           StreamingExecutionRelation(reader, output)(sparkSession)
         })
       case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
@@ -408,15 +405,12 @@ class MicroBatchExecution(
             case v1: SerializedOffset => reader.deserializeOffset(v1.json)
             case v2: OffsetV2 => v2
           }
-          reader.setOffsetRange(toJava(current), Optional.of(availableV2))
+          reader.setOffsetRange(
+            toJava(current),
+            Optional.of(availableV2))
           logDebug(s"Retrieving data from $reader: $current -> $availableV2")
-          Some(reader -> new DataSourceV2Relation(
-            reader.readSchema().toAttributes,
-            // Provide a fake value here just in case something went wrong, 
e.g. the reader gives
-            // a wrong `equals` implementation.
-            readerToDataSourceMap.getOrElse(reader, FakeDataSourceV2),
-            reader,
-            isStreaming = true))
+          Some(reader ->
+            new 
StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
         case _ => None
       }
     }
@@ -506,5 +500,3 @@ class MicroBatchExecution(
     Optional.ofNullable(scalaOption.orNull)
   }
 }
-
-object FakeDataSourceV2 extends DataSourceV2

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index f87d57d..c3294d6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
StreamingDataSourceV2Relation, WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
@@ -167,7 +167,7 @@ class ContinuousExecution(
 
     var insertedSourceId = 0
     val withNewSources = logicalPlan transform {
-      case ContinuousExecutionRelation(ds, _, output) =>
+      case ContinuousExecutionRelation(_, _, output) =>
         val reader = continuousSources(insertedSourceId)
         insertedSourceId += 1
         val newOutput = reader.readSchema().toAttributes
@@ -180,7 +180,7 @@ class ContinuousExecution(
         val loggedOffset = offsets.offsets(0)
         val realOffset = loggedOffset.map(off => 
reader.deserializeOffset(off.json))
         reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
-        new DataSourceV2Relation(newOutput, ds, reader, isStreaming = true)
+        new StreamingDataSourceV2Relation(newOutput, reader)
     }
 
     // Rewire the plan to use the new attributes that were returned by the 
source.
@@ -201,8 +201,7 @@ class ContinuousExecution(
     val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
 
     val reader = withSink.collect {
-      case r: DataSourceV2Relation if r.reader.isInstanceOf[ContinuousReader] 
=>
-        r.reader.asInstanceOf[ContinuousReader]
+      case DataSourceV2Relation(_, r: ContinuousReader) => r
     }.head
 
     reportTimeTaken("queryPlanning") {

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 70eb9f0..d1a0483 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -492,16 +492,16 @@ class StreamSuite extends StreamTest {
 
       val explainWithoutExtended = q.explainInternal(false)
       // `extended = false` only displays the physical plan.
-      assert("Streaming 
Relation".r.findAllMatchIn(explainWithoutExtended).size === 0)
-      assert("Scan 
FakeDataSourceV2".r.findAllMatchIn(explainWithoutExtended).size === 1)
+      
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size
 === 0)
+      assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size 
=== 1)
       // Use "StateStoreRestore" to verify that it does output a streaming 
physical plan
       assert(explainWithoutExtended.contains("StateStoreRestore"))
 
       val explainWithExtended = q.explainInternal(true)
       // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
       // plan.
-      assert("Streaming Relation".r.findAllMatchIn(explainWithExtended).size 
=== 3)
-      assert("Scan 
FakeDataSourceV2".r.findAllMatchIn(explainWithExtended).size === 1)
+      
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size
 === 3)
+      assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 
1)
       // Use "StateStoreRestore" to verify that it does output a streaming 
physical plan
       assert(explainWithExtended.contains("StateStoreRestore"))
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 2543946..37fe595 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -605,7 +605,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
                 plan
                   .collect {
                     case StreamingExecutionRelation(s, _) => s
-                    case d: DataSourceV2Relation => d.reader
+                    case DataSourceV2Relation(_, r) => r
                   }
                   .zipWithIndex
                   .find(_._1 == source)

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f5e172/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 9ee9aaf..4b4ed82 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql.streaming.continuous
 
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
+import java.util.UUID
+
+import org.apache.spark.{SparkContext, SparkEnv, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, 
SparkListenerTaskStart}
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, 
WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.{StreamTest, Trigger}
 import org.apache.spark.sql.test.TestSparkSession
@@ -40,7 +43,7 @@ class ContinuousSuiteBase extends StreamTest {
       case s: ContinuousExecution =>
         assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure 
query is initialized")
         val reader = s.lastExecution.executedPlan.collectFirst {
-          case DataSourceV2ScanExec(_, _, r: RateStreamContinuousReader) => r
+          case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r
         }.get
 
         val deltaMs = numTriggers * 1000 + 300


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to