[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement
wuchong commented on a change in pull request #13011: URL: https://github.com/apache/flink/pull/13011#discussion_r616298791 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } +private String buildShowCreateTableRow( +ResolvedCatalogBaseTable table, +ObjectIdentifier sqlIdentifier, +boolean isTemporary) { +CatalogBaseTable.TableKind kind = table.getTableKind(); +if (kind == CatalogBaseTable.TableKind.VIEW) { +throw new TableException( +String.format( +"SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", +sqlIdentifier.asSerializableString())); +} +StringBuilder sb = +new StringBuilder( +String.format( +"CREATE %sTABLE %s (\n", +isTemporary ? "TEMPORARY " : "", +sqlIdentifier.asSerializableString())); +ResolvedSchema schema = table.getResolvedSchema(); +// append columns +sb.append( +schema.getColumns().stream() +.map(column -> String.format("%s%s", printIndent, getColumnString(column))) +.collect(Collectors.joining(",\n"))); +// append watermark spec +if (!schema.getWatermarkSpecs().isEmpty()) { +sb.append(",\n"); +sb.append( +schema.getWatermarkSpecs().stream() +.map( +watermarkSpec -> +String.format( +"%sWATERMARK FOR %s AS %s", +printIndent, +String.join( +".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), +watermarkSpec + .getWatermarkExpression() + .asSummaryString())) +.collect(Collectors.joining("\n"))); +} +// append constraint +if (schema.getPrimaryKey().isPresent()) { +sb.append(",\n"); +sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); +} +sb.append("\n) "); +// append comment +String comment = table.getComment(); +if (StringUtils.isNotEmpty(comment)) { +sb.append(String.format("COMMENT '%s'\n", comment)); +} +// append partitions +ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; +if (catalogTable.isPartitioned()) { +sb.append("PARTITIONED BY (") +.append( +catalogTable.getPartitionKeys().stream() +.map(key -> String.format("`%s`", key)) +.collect(Collectors.joining(", "))) +.append(")\n"); +} +// append `with` properties +Map options = table.getOptions(); +sb.append("WITH (\n") +.append( +options.entrySet().stream() +.map( +entry -> +String.format( +"%s'%s' = '%s'", +printIndent, +entry.getKey(), +entry.getValue())) +.collect(Collectors.joining(",\n"))) +.append("\n)\n"); +return sb.toString(); +} + +private String getColumnString(Column column) { +final StringBuilder sb = new StringBuilder(); +sb.append(EncodingUtils.escapeIdentifier(column.getName())); +sb.append(" "); +// skip data type for computed column +if (column instanceof Column.ComputedColumn) { +sb.append( +column.explainExtras() +
[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement
wuchong commented on a change in pull request #13011: URL: https://github.com/apache/flink/pull/13011#discussion_r615716979 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } +private String buildShowCreateTableRow( +ResolvedCatalogBaseTable table, +ObjectIdentifier sqlIdentifier, +boolean isTemporary) { +CatalogBaseTable.TableKind kind = table.getTableKind(); +if (kind == CatalogBaseTable.TableKind.VIEW) { +throw new TableException( +String.format( +"SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", +sqlIdentifier.asSerializableString())); +} +StringBuilder sb = +new StringBuilder( +String.format( +"CREATE %sTABLE %s (\n", +isTemporary ? "TEMPORARY " : "", +sqlIdentifier.asSerializableString())); +ResolvedSchema schema = table.getResolvedSchema(); +// append columns +sb.append( +schema.getColumns().stream() +.map(column -> String.format("%s%s", printIndent, getColumnString(column))) +.collect(Collectors.joining(",\n"))); +// append watermark spec +if (!schema.getWatermarkSpecs().isEmpty()) { +sb.append(",\n"); +sb.append( +schema.getWatermarkSpecs().stream() +.map( +watermarkSpec -> +String.format( +"%sWATERMARK FOR %s AS %s", +printIndent, +String.join( +".", + EncodingUtils.escapeIdentifier( + watermarkSpec + .getRowtimeAttribute())), +watermarkSpec + .getWatermarkExpression() + .asSummaryString())) +.collect(Collectors.joining("\n"))); +} +// append constraint +if (schema.getPrimaryKey().isPresent()) { +sb.append(",\n"); +sb.append(String.format("%s%s", printIndent, schema.getPrimaryKey().get())); +} +sb.append("\n) "); +// append comment +String comment = table.getComment(); +if (StringUtils.isNotEmpty(comment)) { +sb.append(String.format("COMMENT '%s'\n", comment)); +} +// append partitions +ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; +if (catalogTable.isPartitioned()) { +sb.append("PARTITIONED BY (") +.append( +catalogTable.getPartitionKeys().stream() +.map(key -> String.format("`%s`", key)) +.collect(Collectors.joining(", "))) +.append(")\n"); +} +// append `with` properties +Map options = table.getOptions(); +sb.append("WITH (\n") +.append( +options.entrySet().stream() +.map( +entry -> +String.format( +"%s'%s' = '%s'", +printIndent, +entry.getKey(), +entry.getValue())) +.collect(Collectors.joining(",\n"))) +.append("\n)\n"); +return sb.toString(); +} + +private String getColumnString(Column column) { +final StringBuilder sb = new StringBuilder(); +sb.append(EncodingUtils.escapeIdentifier(column.getName())); +sb.append(" "); +// skip data type for computed column +if (column instanceof Column.ComputedColumn) { +sb.append( +column.explainExtras() +
[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement
wuchong commented on a change in pull request #13011: URL: https://github.com/apache/flink/pull/13011#discussion_r615565314 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); } +private String buildShowCreateTableRow( +ResolvedCatalogBaseTable table, +ObjectIdentifier sqlIdentifier, +boolean isTemporary) { +CatalogBaseTable.TableKind kind = table.getTableKind(); +if (kind == CatalogBaseTable.TableKind.VIEW) { +throw new TableException( +String.format( +"SHOW CREATE TABLE does not support showing CREATE VIEW statement with identifier %s.", +sqlIdentifier.asSerializableString())); +} +StringBuilder sb = +new StringBuilder( +String.format( +"CREATE %sTABLE %s (\n", +isTemporary ? "TEMPORARY " : "", +sqlIdentifier.asSerializableString())); +ResolvedSchema schema = table.getResolvedSchema(); +// append columns +sb.append( +schema.getColumns().stream() +.map(column -> String.format("%s%s", printIndent, getColumnString(column))) +.collect(Collectors.joining(",\n"))); +// append watermark spec +if (!schema.getWatermarkSpecs().isEmpty()) { +sb.append(",\n"); +sb.append( +schema.getWatermarkSpecs().stream() +.map( +watermarkSpec -> +String.format( +"%sWATERMARK FOR %s AS %s", +printIndent, +String.join( +".", Review comment: I think this is a mistake in `WatermarkSpec#asSummaryString`, we can correct it in this PR or another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #13011: [FLINK-16384][table sql/client] Support SHOW CREATE TABLE statement
wuchong commented on a change in pull request #13011: URL: https://github.com/apache/flink/pull/13011#discussion_r465552827 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java ## @@ -75,21 +76,26 @@ public ConstraintType getType() { */ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - + final String typeString = getTypeString(); return String.format("CONSTRAINT %s %s (%s)", getName(), typeString, String.join(", ", columns)); } + /** +* Returns constraint's canonical summary. All constraints summary will be formatted as +* +* CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition]) NOT ENFORCED +* +* E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED +* +*/ + public final String asCanonicalString() { + final String typeString = getTypeString(); + return String.format("CONSTRAINT %s %s (%s) NOT ENFORCED", + getName(), + typeString, + String.join(", ", columns.stream().map(col -> String.format("`%s`", col)).collect(Collectors.toList(; Review comment: Use `EncodingUtils#escapeIdentifier` to escape identifiers. ## File path: docs/content/docs/dev/table/sql/show.md ## @@ -427,6 +458,17 @@ SHOW TABLES Show all tables in the current catalog and the current database. + +## SHOW CREATE TABLE + +```sql +SHOW CREATE TABLE +``` + +Show create table statement for specified table. + +Attention Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL. Review comment: minor: ```suggestion Attention Currently `SHOW CREATE TABLE` only supports table that is created by Flink SQL DDL. ``` ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/UniqueConstraint.java ## @@ -75,21 +76,26 @@ public ConstraintType getType() { */ @Override public final String asSummaryString() { - final String typeString; - switch (getType()) { - case PRIMARY_KEY: - typeString = "PRIMARY KEY"; - break; - case UNIQUE_KEY: - typeString = "UNIQUE"; - break; - default: - throw new IllegalStateException("Unknown key type: " + getType()); - } - + final String typeString = getTypeString(); return String.format("CONSTRAINT %s %s (%s)", getName(), typeString, String.join(", ", columns)); Review comment: I think we should also add `NOT ENFORCED` for the summary string. Currentlly, it is not correct. Besides, we should add `NOT ENFORCED` according the underlying `enforced` flag, even though it is always flase for now. ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -1072,6 +1086,75 @@ private TableResult buildShowResult(String columnName, String[] objects) { Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new)); } + private TableResult buildShowCreateTableResult(CatalogBaseTable table, ObjectIdentifier sqlIdentifier) { Review comment: I think this must be a `CatalogTable`, because this is `SHOW CREATE TABLE`. We should throw exception before this if it is a view. ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -1015,6 +1018,17 @@ private TableResult executeOperation(Operation operation) { return buildShowResult("database name", listDatabases()); } else if (operation instanceof ShowCurrentDatabaseOperation) { return buildShowResult("current database name", new String[]{catalogManager.getCurrentDatabase()}); + } else if (operation instanceof ShowCreateTableOperation) { + ShowCreateTableOperation showCreateTableOperation = (ShowCreateTableOperation) operation; + Optional