cloud-fan commented on a change in pull request #29756:
URL: https://github.com/apache/spark/pull/29756#discussion_r494064042



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -861,15 +862,22 @@ class Analyzer(
         lookupTempView(ident).map(_ => 
ResolvedView(ident.asIdentifier)).getOrElse(u)
     }
 
-    def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
+    def lookupTempView(
+        identifier: Seq[String], isStreaming: Boolean = false): 
Option[LogicalPlan] = {
       // Permanent View can't refer to temp views, no need to lookup at all.
       if (isResolvingView) return None
 
-      identifier match {
+      val tmpView = identifier match {
         case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
         case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, 
part2)
         case _ => None
       }
+
+      if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
+        throw new AnalysisException("The temp view related to non-streaming 
relation is " +

Review comment:
       ```
   "${identifier.quoted} is a temp view of batch logical plan, please use batch 
API such as `DataFrameReader.table` to read it."
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -861,15 +862,22 @@ class Analyzer(
         lookupTempView(ident).map(_ => 
ResolvedView(ident.asIdentifier)).getOrElse(u)
     }
 
-    def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
+    def lookupTempView(
+        identifier: Seq[String], isStreaming: Boolean = false): 
Option[LogicalPlan] = {
       // Permanent View can't refer to temp views, no need to lookup at all.
       if (isResolvingView) return None
 
-      identifier match {
+      val tmpView = identifier match {
         case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
         case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, 
part2)
         case _ => None
       }
+
+      if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
+        throw new AnalysisException("The temp view related to non-streaming 
relation is " +

Review comment:
       ```
   "${identifier.quoted} is not a temp view of streaming logical plan, please 
use batch API such as `DataFrameReader.table` to read it."
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1020,16 +1039,38 @@ class Analyzer(
     // 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2 
relation.
     private def lookupRelation(
         identifier: Seq[String],
-        options: CaseInsensitiveStringMap): Option[LogicalPlan] = {
+        options: CaseInsensitiveStringMap,
+        isStreaming: Boolean): Option[LogicalPlan] = {
       expandRelationName(identifier) match {
         case SessionCatalogAndIdentifier(catalog, ident) =>
           lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
             case v1Table: V1Table =>
-              v1SessionCatalog.getRelation(v1Table.v1Table, options)
+              if (isStreaming) {
+                if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
+                  throw new AnalysisException("Stream reading does not support 
views.")

Review comment:
       ```
   "${identifier.quoted} is a permanent view, which is not supported by 
streaming reading API such as `DataStreamReader.table` yet."
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap, Attri
 import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Table}

Review comment:
       unnecessary change?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
##########
@@ -80,3 +80,11 @@ private[sql] case class V1Table(v1Table: CatalogTable) 
extends Table {
 
   override def toString: String = s"V1Table($name)"
 }
+
+/**
+ * A V2 table with V1 fallback support. This is used to fallback to V1 table 
when the V2 one
+ * doesn't implement specific capabilities but V1 already has.
+ */
+trait V2TableWithV1Fallback extends Table {

Review comment:
       please add `private[sql]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to