rdblue commented on a change in pull request #28026: [SPARK-31257][SQL] Unify
create table syntax (WIP)
URL: https://github.com/apache/spark/pull/28026#discussion_r400540742
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
##########
@@ -2715,24 +2729,198 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
(filtered, path)
}
+ /**
+ * Create a [[SerdeInfo]] for creating tables.
+ *
+ * Format: STORED AS (name | INPUTFORMAT input_format OUTPUTFORMAT
output_format)
+ */
+ override def visitCreateFileFormat(ctx: CreateFileFormatContext): SerdeInfo
= withOrigin(ctx) {
+ (ctx.fileFormat, ctx.storageHandler) match {
+ // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
+ case (c: TableFileFormatContext, null) =>
+ SerdeInfo(formatClasses = Some((string(c.inFmt), string(c.outFmt))))
+ // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET |
AVRO
+ case (c: GenericFileFormatContext, null) =>
+ SerdeInfo(storedAs = Some(c.identifier.getText))
+ case (null, storageHandler) =>
+ operationNotAllowed("STORED BY", ctx)
+ case _ =>
+ throw new ParseException("Expected either STORED AS or STORED BY, not
both", ctx)
+ }
+ }
+
+ /**
+ * Create a [[SerdeInfo]] used for creating tables.
+ *
+ * Example format:
+ * {{{
+ * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
+ * }}}
+ *
+ * OR
+ *
+ * {{{
+ * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
+ * [COLLECTION ITEMS TERMINATED BY char]
+ * [MAP KEYS TERMINATED BY char]
+ * [LINES TERMINATED BY char]
+ * [NULL DEFINED AS char]
+ * }}}
+ */
+ def visitRowFormat(
+ ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
+ ctx match {
+ case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
+ case delimited: RowFormatDelimitedContext =>
visitRowFormatDelimited(delimited)
+ }
+ }
+
+ /**
+ * Create SERDE row format name and properties pair.
+ */
+ override def visitRowFormatSerde(ctx: RowFormatSerdeContext): SerdeInfo =
withOrigin(ctx) {
+ import ctx._
+ SerdeInfo(
+ serde = Some(string(name)),
+ serdeProperties =
Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
+ }
+
+ /**
+ * Create a delimited row format properties object.
+ */
+ override def visitRowFormatDelimited(
+ ctx: RowFormatDelimitedContext): SerdeInfo = withOrigin(ctx) {
+ // Collect the entries if any.
+ def entry(key: String, value: Token): Seq[(String, String)] = {
+ Option(value).toSeq.map(x => key -> string(x))
+ }
+ // TODO we need proper support for the NULL format.
+ val entries =
+ entry("field.delim", ctx.fieldsTerminatedBy) ++
+ entry("serialization.format", ctx.fieldsTerminatedBy) ++
+ entry("escape.delim", ctx.escapedBy) ++
+ // The following typo is inherited from Hive...
+ entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
+ entry("mapkey.delim", ctx.keysTerminatedBy) ++
+ Option(ctx.linesSeparatedBy).toSeq.map { token =>
+ val value = string(token)
+ validate(
+ value == "\n",
+ s"LINES TERMINATED BY only supports newline '\\n' right now:
$value",
+ ctx)
+ "line.delim" -> value
+ }
+ SerdeInfo(serdeProperties = entries.toMap)
+ }
+
+ /**
+ * Throw a [[ParseException]] if the user specified incompatible SerDes
through ROW FORMAT
+ * and STORED AS.
+ *
+ * The following are allowed. Anything else is not:
+ * ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
+ * ROW FORMAT DELIMITED ... STORED AS TEXTFILE
+ * ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
+ */
+ protected def validateRowFormatFileFormat(
+ rowFormatCtx: RowFormatContext,
+ createFileFormatCtx: CreateFileFormatContext,
+ parentCtx: ParserRuleContext): Unit = {
+ if (rowFormatCtx == null || createFileFormatCtx == null) {
+ return
+ }
+ (rowFormatCtx, createFileFormatCtx.fileFormat) match {
+ case (_, ffTable: TableFileFormatContext) => // OK
+ case (rfSerde: RowFormatSerdeContext, ffGeneric:
GenericFileFormatContext) =>
+ ffGeneric.identifier.getText.toLowerCase(Locale.ROOT) match {
+ case ("sequencefile" | "textfile" | "rcfile") => // OK
+ case fmt =>
+ operationNotAllowed(
+ s"ROW FORMAT SERDE is incompatible with format '$fmt', which
also specifies a serde",
+ parentCtx)
+ }
+ case (rfDelimited: RowFormatDelimitedContext, ffGeneric:
GenericFileFormatContext) =>
+ ffGeneric.identifier.getText.toLowerCase(Locale.ROOT) match {
+ case "textfile" => // OK
+ case fmt => operationNotAllowed(
+ s"ROW FORMAT DELIMITED is only compatible with 'textfile', not
'$fmt'", parentCtx)
+ }
+ case _ =>
+ // should never happen
+ def str(ctx: ParserRuleContext): String = {
+ (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText
}.mkString(" ")
+ }
+ operationNotAllowed(
+ s"Unexpected combination of ${str(rowFormatCtx)} and
${str(createFileFormatCtx)}",
+ parentCtx)
+ }
+ }
+
+ protected def validateRowFormatFileFormat(
+ rowFormatCtx: Seq[RowFormatContext],
+ createFileFormatCtx: Seq[CreateFileFormatContext],
+ parentCtx: ParserRuleContext): Unit = {
+ if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) {
+ validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head,
parentCtx)
+ }
+ }
+
override def visitCreateTableClauses(ctx: CreateTableClausesContext):
TableClauses = {
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
+ checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
Review comment:
I'll fix it. It's an artifact of porting to master.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]