[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement

2021-04-19 Thread GitBox


wuchong commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r616298791



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
 }
 
+private String buildShowCreateTableRow(
+ResolvedCatalogBaseTable table,
+ObjectIdentifier sqlIdentifier,
+boolean isTemporary) {
+CatalogBaseTable.TableKind kind = table.getTableKind();
+if (kind == CatalogBaseTable.TableKind.VIEW) {
+throw new TableException(
+String.format(
+"SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+sqlIdentifier.asSerializableString()));
+}
+StringBuilder sb =
+new StringBuilder(
+String.format(
+"CREATE %sTABLE %s (\n",
+isTemporary ? "TEMPORARY " : "",
+sqlIdentifier.asSerializableString()));
+ResolvedSchema schema = table.getResolvedSchema();
+// append columns
+sb.append(
+schema.getColumns().stream()
+.map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+.collect(Collectors.joining(",\n")));
+// append watermark spec
+if (!schema.getWatermarkSpecs().isEmpty()) {
+sb.append(",\n");
+sb.append(
+schema.getWatermarkSpecs().stream()
+.map(
+watermarkSpec ->
+String.format(
+"%sWATERMARK FOR %s AS %s",
+printIndent,
+String.join(
+".",
+
EncodingUtils.escapeIdentifier(
+
watermarkSpec
+
.getRowtimeAttribute())),
+watermarkSpec
+
.getWatermarkExpression()
+
.asSummaryString()))
+.collect(Collectors.joining("\n")));
+}
+// append constraint
+if (schema.getPrimaryKey().isPresent()) {
+sb.append(",\n");
+sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+}
+sb.append("\n) ");
+// append comment
+String comment = table.getComment();
+if (StringUtils.isNotEmpty(comment)) {
+sb.append(String.format("COMMENT '%s'\n", comment));
+}
+// append partitions
+ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+if (catalogTable.isPartitioned()) {
+sb.append("PARTITIONED BY (")
+.append(
+catalogTable.getPartitionKeys().stream()
+.map(key -> String.format("`%s`", key))
+.collect(Collectors.joining(", ")))
+.append(")\n");
+}
+// append `with` properties
+Map options = table.getOptions();
+sb.append("WITH (\n")
+.append(
+options.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s'%s' = '%s'",
+printIndent,
+entry.getKey(),
+entry.getValue()))
+.collect(Collectors.joining(",\n")))
+.append("\n)\n");
+return sb.toString();
+}
+
+private String getColumnString(Column column) {
+final StringBuilder sb = new StringBuilder();
+sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+sb.append(" ");
+// skip data type for computed column
+if (column instanceof Column.ComputedColumn) {
+sb.append(
+column.explainExtras()
+ 

[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement

2021-04-19 Thread GitBox


wuchong commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r615716979



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
 }
 
+private String buildShowCreateTableRow(
+ResolvedCatalogBaseTable table,
+ObjectIdentifier sqlIdentifier,
+boolean isTemporary) {
+CatalogBaseTable.TableKind kind = table.getTableKind();
+if (kind == CatalogBaseTable.TableKind.VIEW) {
+throw new TableException(
+String.format(
+"SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+sqlIdentifier.asSerializableString()));
+}
+StringBuilder sb =
+new StringBuilder(
+String.format(
+"CREATE %sTABLE %s (\n",
+isTemporary ? "TEMPORARY " : "",
+sqlIdentifier.asSerializableString()));
+ResolvedSchema schema = table.getResolvedSchema();
+// append columns
+sb.append(
+schema.getColumns().stream()
+.map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+.collect(Collectors.joining(",\n")));
+// append watermark spec
+if (!schema.getWatermarkSpecs().isEmpty()) {
+sb.append(",\n");
+sb.append(
+schema.getWatermarkSpecs().stream()
+.map(
+watermarkSpec ->
+String.format(
+"%sWATERMARK FOR %s AS %s",
+printIndent,
+String.join(
+".",
+
EncodingUtils.escapeIdentifier(
+
watermarkSpec
+
.getRowtimeAttribute())),
+watermarkSpec
+
.getWatermarkExpression()
+
.asSummaryString()))
+.collect(Collectors.joining("\n")));
+}
+// append constraint
+if (schema.getPrimaryKey().isPresent()) {
+sb.append(",\n");
+sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+}
+sb.append("\n) ");
+// append comment
+String comment = table.getComment();
+if (StringUtils.isNotEmpty(comment)) {
+sb.append(String.format("COMMENT '%s'\n", comment));
+}
+// append partitions
+ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+if (catalogTable.isPartitioned()) {
+sb.append("PARTITIONED BY (")
+.append(
+catalogTable.getPartitionKeys().stream()
+.map(key -> String.format("`%s`", key))
+.collect(Collectors.joining(", ")))
+.append(")\n");
+}
+// append `with` properties
+Map options = table.getOptions();
+sb.append("WITH (\n")
+.append(
+options.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s'%s' = '%s'",
+printIndent,
+entry.getKey(),
+entry.getValue()))
+.collect(Collectors.joining(",\n")))
+.append("\n)\n");
+return sb.toString();
+}
+
+private String getColumnString(Column column) {
+final StringBuilder sb = new StringBuilder();
+sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+sb.append(" ");
+// skip data type for computed column
+if (column instanceof Column.ComputedColumn) {
+sb.append(
+column.explainExtras()
+ 

[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement

2021-04-19 Thread GitBox


wuchong commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r615565314



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
 }
 
+private String buildShowCreateTableRow(
+ResolvedCatalogBaseTable table,
+ObjectIdentifier sqlIdentifier,
+boolean isTemporary) {
+CatalogBaseTable.TableKind kind = table.getTableKind();
+if (kind == CatalogBaseTable.TableKind.VIEW) {
+throw new TableException(
+String.format(
+"SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+sqlIdentifier.asSerializableString()));
+}
+StringBuilder sb =
+new StringBuilder(
+String.format(
+"CREATE %sTABLE %s (\n",
+isTemporary ? "TEMPORARY " : "",
+sqlIdentifier.asSerializableString()));
+ResolvedSchema schema = table.getResolvedSchema();
+// append columns
+sb.append(
+schema.getColumns().stream()
+.map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+.collect(Collectors.joining(",\n")));
+// append watermark spec
+if (!schema.getWatermarkSpecs().isEmpty()) {
+sb.append(",\n");
+sb.append(
+schema.getWatermarkSpecs().stream()
+.map(
+watermarkSpec ->
+String.format(
+"%sWATERMARK FOR %s AS %s",
+printIndent,
+String.join(
+".",

Review comment:
   I think this is a mistake in `WatermarkSpec#asSummaryString`, we can 
correct it in this PR or another PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement

2021-04-17 Thread GitBox


wuchong commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r465552827



##
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java
##
@@ -75,21 +76,26 @@ public ConstraintType getType() {
 */
@Override
public final String asSummaryString() {
-   final String typeString;
-   switch (getType()) {
-   case PRIMARY_KEY:
-   typeString = "PRIMARY KEY";
-   break;
-   case UNIQUE_KEY:
-   typeString = "UNIQUE";
-   break;
-   default:
-   throw new IllegalStateException("Unknown key 
type: " + getType());
-   }
-
+   final String typeString = getTypeString();
return String.format("CONSTRAINT %s %s (%s)", getName(), 
typeString, String.join(", ", columns));
}
 
+   /**
+* Returns constraint's canonical summary. All constraints summary will 
be formatted as
+* 
+* CONSTRAINT [constraint-name] [constraint-type] 
([constraint-definition]) NOT ENFORCED
+*
+* E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
+* 
+*/
+   public final String asCanonicalString() {
+   final String typeString = getTypeString();
+   return String.format("CONSTRAINT %s %s (%s) NOT ENFORCED",
+   getName(),
+   typeString,
+   String.join(", ", columns.stream().map(col -> 
String.format("`%s`", col)).collect(Collectors.toList(;

Review comment:
   Use `EncodingUtils#escapeIdentifier` to escape identifiers. 

##
File path: docs/content/docs/dev/table/sql/show.md
##
@@ -427,6 +458,17 @@ SHOW TABLES
 
 Show all tables in the current catalog and the current database.
 
+
+## SHOW CREATE TABLE
+
+```sql
+SHOW CREATE TABLE
+```
+
+Show create table statement for specified table.
+
+Attention Currently `SHOW CREATE 
TABLE` only supports table that is created by Flink SQL. 

Review comment:
   minor:
   
   ```suggestion
   Attention Currently `SHOW CREATE 
TABLE` only supports table that is created by Flink SQL DDL. 
   ```

##
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java
##
@@ -75,21 +76,26 @@ public ConstraintType getType() {
 */
@Override
public final String asSummaryString() {
-   final String typeString;
-   switch (getType()) {
-   case PRIMARY_KEY:
-   typeString = "PRIMARY KEY";
-   break;
-   case UNIQUE_KEY:
-   typeString = "UNIQUE";
-   break;
-   default:
-   throw new IllegalStateException("Unknown key 
type: " + getType());
-   }
-
+   final String typeString = getTypeString();
return String.format("CONSTRAINT %s %s (%s)", getName(), 
typeString, String.join(", ", columns));

Review comment:
   I think we should also add `NOT ENFORCED` for the summary string. 
Currentlly, it is not correct. 
   Besides, we should add `NOT ENFORCED` according the underlying `enforced` 
flag, even though it is always flase for now. 

##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -1072,6 +1086,75 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
Arrays.stream(objects).map((c) -> new 
String[]{c}).toArray(String[][]::new));
}
 
+   private TableResult buildShowCreateTableResult(CatalogBaseTable table, 
ObjectIdentifier sqlIdentifier) {

Review comment:
   I think this must be a `CatalogTable`, because this is `SHOW CREATE 
TABLE`.  We should throw exception before this if it is a view. 

##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -1015,6 +1018,17 @@ private TableResult executeOperation(Operation 
operation) {
return buildShowResult("database name", 
listDatabases());
} else if (operation instanceof ShowCurrentDatabaseOperation) {
return buildShowResult("current database name", new 
String[]{catalogManager.getCurrentDatabase()});
+   } else if (operation instanceof ShowCreateTableOperation) {
+   ShowCreateTableOperation showCreateTableOperation = 
(ShowCreateTableOperation) operation;
+   Optional