cloud-fan commented on a change in pull request #29756:
URL: https://github.com/apache/spark/pull/29756#discussion_r494064042
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -861,15 +862,22 @@ class Analyzer(
lookupTempView(ident).map(_ =>
ResolvedView(ident.asIdentifier)).getOrElse(u)
}
- def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
+ def lookupTempView(
+ identifier: Seq[String], isStreaming: Boolean = false):
Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView) return None
- identifier match {
+ val tmpView = identifier match {
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1,
part2)
case _ => None
}
+
+ if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
+ throw new AnalysisException("The temp view related to non-streaming
relation is " +
Review comment:
```
"${identifier.quoted} is a temp view of batch logical plan, please use batch
API such as `DataFrameReader.table` to read it."
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -861,15 +862,22 @@ class Analyzer(
lookupTempView(ident).map(_ =>
ResolvedView(ident.asIdentifier)).getOrElse(u)
}
- def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
+ def lookupTempView(
+ identifier: Seq[String], isStreaming: Boolean = false):
Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView) return None
- identifier match {
+ val tmpView = identifier match {
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1,
part2)
case _ => None
}
+
+ if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
+ throw new AnalysisException("The temp view related to non-streaming
relation is " +
Review comment:
```
"${identifier.quoted} is not a temp view of streaming logical plan, please
use batch API such as `DataFrameReader.table` to read it."
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1020,16 +1039,38 @@ class Analyzer(
// 3) If a v1 table is found, create a v1 relation. Otherwise, create a v2
relation.
private def lookupRelation(
identifier: Seq[String],
- options: CaseInsensitiveStringMap): Option[LogicalPlan] = {
+ options: CaseInsensitiveStringMap,
+ isStreaming: Boolean): Option[LogicalPlan] = {
expandRelationName(identifier) match {
case SessionCatalogAndIdentifier(catalog, ident) =>
lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table =>
- v1SessionCatalog.getRelation(v1Table.v1Table, options)
+ if (isStreaming) {
+ if (v1Table.v1Table.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException("Stream reading does not support
views.")
Review comment:
```
"${identifier.quoted} is a permanent view, which is not supported by
streaming reading API such as `DataStreamReader.table` yet."
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical._
import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Table}
Review comment:
unnecessary change?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
##########
@@ -80,3 +80,11 @@ private[sql] case class V1Table(v1Table: CatalogTable)
extends Table {
override def toString: String = s"V1Table($name)"
}
+
+/**
+ * A V2 table with V1 fallback support. This is used to fallback to V1 table
when the V2 one
+ * doesn't implement specific capabilities but V1 already has.
+ */
+trait V2TableWithV1Fallback extends Table {
Review comment:
please add `private[sql]`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]