[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933738 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,126 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`, +The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement: Review Comment: Adds the NOTE to use `primary-key` and `partition`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933424 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java: ## @@ -474,6 +474,20 @@ public class CoreOptions implements Serializable { + "$hour:00:00'.")) .build()); +public static final ConfigOption PRIMARY_KEY = +key("primary-key") +.stringType() +.noDefaultValue() +.withDescription( +"Define primary key by table options, cannot define primary key on DDL and table options at the same time."); Review Comment: Which case could the user define primary key on DDL and table options at the same time? IMO, you need to explain the case clearly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933201 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -123,6 +123,40 @@ public TableSchema createTable(Schema schema) throws Exception { Map options = schema.options(); int highestFieldId = RowType.currentHighestFieldId(fields); +List columnNames = + schema.fields().stream().map(DataField::name).collect(Collectors.toList()); +if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) { +if (!primaryKeys.isEmpty()) { Review Comment: The validation should be added into `SchemaValidation#validateTableSchema`. Meanwhile adds the unit tests for this validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933201 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -123,6 +123,40 @@ public TableSchema createTable(Schema schema) throws Exception { Map options = schema.options(); int highestFieldId = RowType.currentHighestFieldId(fields); +List columnNames = + schema.fields().stream().map(DataField::name).collect(Collectors.toList()); +if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) { +if (!primaryKeys.isEmpty()) { Review Comment: The validation should be added into `SchemaValidation#validateTableSchema`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112631890 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,79 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`, +The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement: +`CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;` + +{{< tabs "create-table-as" >}} + +{{< tab "Flink" >}} + +// For streaming mode, you need to enable the checkpoint. Review Comment: LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112563412 ## flink-table-store-connector/pom.xml: ## @@ -151,6 +151,27 @@ under the License. provided + +org.apache.hadoop +hadoop-mapreduce-client-core +${hadoop.version} +provided Review Comment: Could the scope set to `test`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112563022 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,79 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`, +The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement: +`CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;` + +{{< tabs "create-table-as" >}} + +{{< tab "Flink" >}} + +// For streaming mode, you need to enable the checkpoint. Review Comment: Could this use NOTE to explain? The `//` looks no good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r831443 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Tables can also be created and populated by the results of a query. + +{{< tabs "create-table-as" >}} + +{{< tab "Flink" >}} + +// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job, +// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit. Review Comment: @zhangjun0x01, because the explanation is in the `Flink` tab, you could explain that enable the checkpoint for streaming mode, no need to explain the job behavior. BTW, you could add the `Spark3` tab for `Create Table As`, because you have added the test case of `Create Table As` in `SparkReadITCase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r831443 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Tables can also be created and populated by the results of a query. + +{{< tabs "create-table-as" >}} + +{{< tab "Flink" >}} + +// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job, +// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit. Review Comment: @zhangjun0x01, because the explanation is in the `Flink` tab, you could explain that enable the checkpoint for streaming mode, no need to explain the job behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r378285 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Tables can also be created and populated by the results of a query. + +{{< tabs "create-table-as" >}} + +{{< tab "Flink" >}} + +// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job, +// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit. Review Comment: It doesn't need to explain the job behavior for batch mode and streaming mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store
SteNicholas commented on code in PR #547: URL: https://github.com/apache/flink-table-store/pull/547#discussion_r378087 ## docs/content/docs/how-to/creating-tables.md: ## @@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined. By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted. {{< /hint >}} +## Create Table As + +Tables can also be created and populated by the results of a query. + +{{< tabs "create-table-as" >}} + +{{< tab "Flink" >}} + +// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job, Review Comment: Please explain the case to use `CREATE TABLE AS` and provide the difference between the usage of this and `CREATE TABLE LIKE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org