gengliangwang commented on code in PR #56481:
URL: https://github.com/apache/spark/pull/56481#discussion_r3431947959


##########
sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala:
##########
@@ -869,6 +869,28 @@ private[sql] object QueryParsingErrors extends 
DataTypeErrorsBase {
       ctx)
   }
 
+  def invalidAtSyntaxTimestamp(
+      timestamp: String, format: String, ctx: ParserRuleContext): Throwable = {
+    new ParseException(
+      errorClass = "INVALID_TIMESTAMP_FORMAT",

Review Comment:
   `INVALID_TIMESTAMP_FORMAT` is a generic, top-level error name, but it's only 
ever used for the `@`-syntax time-travel timestamp. The codebase already has a 
time-travel timestamp error family — 
`INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.{INPUT,UNEVALUABLE,NON_DETERMINISTIC,OPTION}`
 and `INVALID_TIME_TRAVEL_SPEC`. Could this fit there instead, e.g. a new 
`INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.FORMAT` sub-condition or a dedicated 
`INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT`? Error-condition names are stable API, 
so worth settling before this ships.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala:
##########
@@ -2193,6 +2194,69 @@ class PlanParserSuite extends AnalysisTest {
         stop = 38))
   }
 
+  test("at syntax time travel") {
+    def versionPlan(version: String): LogicalPlan = {
+      Project(Seq(UnresolvedStar(None)),
+        RelationTimeTravel(UnresolvedRelation(Seq("a", "b", "c")), None, 
Some(version)))
+    }
+    assertEqual("SELECT * FROM a.b.c@v123456789", versionPlan("123456789"))
+    assertEqual("SELECT * FROM a.b.c@V123456789", versionPlan("123456789"))
+    assertEqual("SELECT * FROM a.b.c @v123456789", versionPlan("123456789"))
+    assertEqual("SELECT * FROM a.b.c@v'Snapshot123456789'", 
versionPlan("Snapshot123456789"))
+
+    val micros = DateTimeUtils.stringToTimestampAnsi(
+      UTF8String.fromString("2019-01-29 00:37:58"),
+      DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+    assertEqual("SELECT * FROM a.b.c@20190129003758000",
+      Project(Seq(UnresolvedStar(None)),
+        RelationTimeTravel(
+          UnresolvedRelation(Seq("a", "b", "c")),
+          Some(Literal(micros, TimestampType)),
+          None)))
+
+    assertEqual("SELECT * FROM `t@v1`",
+      Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("t@v1"))))
+
+    checkError(
+      exception = parseException("SELECT * FROM t@v1 VERSION AS OF 2"),
+      condition = "MULTIPLE_TIME_TRAVEL_SPEC",
+      parameters = Map.empty,
+      context = ExpectedContext(fragment = "VERSION AS OF 2", start = 19, stop 
= 33))
+
+    checkError(
+      exception = parseException("SELECT * FROM t@123"),
+      condition = "INVALID_TIMESTAMP_FORMAT",

Review Comment:
   This covers the wrong-*length* timestamp. Consider also adding a 
malformed-*component* case (e.g. `t@20191301000000000`, month 13) — it 
exercises the `DateTimeException -> INVALID_TIMESTAMP_FORMAT` catch branch in 
`parseAtSyntaxTimestamp`, which the length check never reaches.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -6994,6 +6994,17 @@ object SQLConf {
       .stringConf
       .createWithDefault("versionAsOf")
 
+  val TIME_TRAVEL_AT_SYNTAX_ENABLED =
+    buildConf("spark.sql.timeTravel.atSyntax.enabled")
+      .doc("When true, a table name in a query or in table-reading APIs can 
carry a time " +
+        "travel suffix: 'name@v123' reads version 123 of the table, and " +
+        "'name@20240101000000000' (format yyyyMMddHHmmssSSS, interpreted in 
the session " +
+        "time zone) reads the table as of that timestamp. When false, '@' in 
table names " +
+        "fails at parse time.")
+      .version("5.0.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   This enables the new `@` syntax by default. The backward-compat risk looks 
low — unquoted `@` was previously a parse error, and `@`-named tables need 
backticks (keeping `@` inside the identifier) — but is enabling a brand-new 
(and `[WIP]`) syntax on by default intended here, rather than starting opt-in? 
Just flagging the default for an explicit decision.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2634,13 +2640,83 @@ class AstBuilder extends DataTypeAstBuilder
    * Create an aliased table reference. This is typically used in FROM clauses.
    */
   override def visitTableName(ctx: TableNameContext): LogicalPlan = 
withOrigin(ctx) {
-    val relation = createUnresolvedRelation(ctx.identifierReference, 
Option(ctx.optionsClause))
+    val ttCtx = ctx.temporalTableIdentifierReference
+    val (atTimestamp, atVersion) = temporalSpec(ttCtx, ttCtx.timestamp, 
ttCtx.version)
+    val hasAtSpec = atTimestamp.isDefined || atVersion.isDefined
+    if (hasAtSpec && ctx.temporalClause != null) {
+      withOrigin(ctx.temporalClause) {
+        throw QueryParsingErrors.multipleTimeTravelSpec(ctx.temporalClause)
+      }
+    }
+    val relation = createUnresolvedRelation(ttCtx.identifierReference, 
Option(ctx.optionsClause))
+    val withAtSpec = if (hasAtSpec) {
+      RelationTimeTravel(relation, atTimestamp, atVersion)

Review Comment:
   This inlines exactly what `TemporalIdentifier.wrapTimeTravel` already does 
(`if (isTemporal) RelationTimeTravel(...) else plan`), which the reader 
(`DataFrameReader.table()`) and Connect paths reuse. Minor, but a small shared 
helper taking `(plan, timestamp, version)` would let all three paths go through 
one place.



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1671,10 +1671,17 @@ class SparkConnectPlanner(
 
     rel.getReadTypeCase match {
       case proto.Read.ReadTypeCase.NAMED_TABLE =>
-        UnresolvedRelation(
-          
parser.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier),
+        val temporalIdent =
+          
parser.parseTemporalTableIdentifier(rel.getNamedTable.getUnparsedIdentifier)
+        if (temporalIdent.isTemporal && rel.getIsStreaming) {

Review Comment:
   Good that Connect rejects `@` + streaming with a clear 
`UNSUPPORTED_FEATURE.TIME_TRAVEL`. The classic `DataStreamReader.table()` 
(`sql/core/.../classic/DataStreamReader.scala`, still on 
`parseMultipartIdentifier`) has no equivalent guard, so 
`spark.readStream.table("t@v1")` falls through to a generic 
`PARSE_SYNTAX_ERROR`. It was already an error pre-PR so this isn't a 
regression, but for parity consider routing the classic streaming `table()` 
through `parseTemporalTableIdentifier` and raising the same 
`timeTravelUnsupportedError`.



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1671,10 +1671,17 @@ class SparkConnectPlanner(
 
     rel.getReadTypeCase match {
       case proto.Read.ReadTypeCase.NAMED_TABLE =>
-        UnresolvedRelation(
-          
parser.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier),
+        val temporalIdent =
+          
parser.parseTemporalTableIdentifier(rel.getNamedTable.getUnparsedIdentifier)
+        if (temporalIdent.isTemporal && rel.getIsStreaming) {
+          throw QueryCompilationErrors.timeTravelUnsupportedError(
+            QuotingUtils.quoteNameParts(temporalIdent.nameParts))

Review Comment:
   The five other `timeTravelUnsupportedError` call sites pass `toSQLId(...)`; 
this one uses `QuotingUtils.quoteNameParts`. They produce the same string for 
ordinary names (`toSQLId` additionally strips the 
`__auto_generated_subquery_name` prefix), so for consistency prefer 
`toSQLId(temporalIdent.nameParts)` here (may need the corresponding import).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to