Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
leonardBang commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-4020682305 > I agree this PR was merged too quickly, and could have waited a bit longer. The IT has been added in [cdbd635](https://github.com/apache/flink-connector-kafka/commit/cdbd635171fc4322ba7182eb93e920472d6d9d91). Thanks for adding an IT in a later PR, this makes the change look better. > For the other points, my understanding was: > * I was not aware of a requirement that non-architectural changes must be approved by another committer. In the past, I have seen non-committer approvals accepted for this type of change. For this PR, I also requested reviews from a couple committers based on activity at that time, but did not receive a response. I'm happy to include you or anyone else as reviewers in future if you can commit to timely reviewing them Flink community does not require non-architectural changes must be approved by another committer, but we suggested[1]. In this case, committer loserwang1024 comment that we need an **IT** case, but you reply that we've a **UT** case and merge the PR directly, That's why I point out this issue. Imaging that you're review other committer's PR and left one comment without resolved reply, and he/she merge the PR directly, what's your feeling for the community process? I think it should be a consensus that we should resolve other committers' comment if your PR is reviewing by them and other committer left reasonable comment. [1] Flink Committer Invitation Email template: Being a committer enables you to make changes without relying on other committers to commit your changes. However, we encourage you to keep the same approach to contributions and keep working with peer reviews. Think of the committer status as a recognition of your work in the community, that also gives you the ability to now execute the final step of a contribution (pushing to the Flink repositories) by yourself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli86 commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-4017489772 Hi @leonardBang @ferenc-csaky, thanks for raising this. I agree this PR was merged too quickly, and could have waited a bit longer. The IT has been added in https://github.com/apache/flink-connector-kafka/commit/cdbd635171fc4322ba7182eb93e920472d6d9d91. For the other points, my understanding was: - For straightforward changes, the PR title and linked JIRA have generally been treated as sufficient context. For example, commits such as https://github.com/apache/flink-connector-kafka/commit/20ca57d7efdcca52b5ce44afbccc1e7ad66ff82b, https://github.com/apache/flink-connector-kafka/commit/f2e6b34ebfd7f6d91874cd8ee9b9563b22fa398d, https://github.com/apache/flink-connector-kafka/commit/a42f6a42928bba801bc75897470562debe4c50c4, https://github.com/apache/flink-connector-kafka/commit/e0969a4ba7b5e23500a68b5aea5824fffe4ed0fa and many more that do not include additional descriptions either. For more involved changes, I agree that a fuller description is beneficial, and that is what I have tried to provide in those cases - I was not aware of a requirement that non-architectural changes must be approved by another committer. In the past, I have seen non-committer approvals accepted for this type of change. For this PR, I also requested reviews from a couple committers based on activity at that time, but did not receive a response. I'm happy to include you or anyone else as reviewers in future if you can commit to timely reviewing them - This was not a blind approval. The JIRA and PR describe the motivation and proposed solution in details, comments were addressed (lots are done offline), and the change passed CI. It has been some time since I last committed, if the project’s routines or expectations have changed, please point me to the current standard. I fully support improving the process, e.g. if we want stricter requirements such as mandatory descriptions for all commits, that should be applied consistently to all commits across the repo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
ferenc-csaky commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-4011560886 @bowenli86 I agree with @leonardBang, and you have 3-4 PRs recently that has no or minimal context, 1 supposedly blind approve from a non-committer and then merged. I won't start to revert these, as after a quick look they do not seem problematic to me (I could be wrong, I did not spend too much time checking), but this is not how we should operate. It would be helpful to address the things Leonard mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
leonardBang commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3775954183 > Is there any ITCase for table and SQL Hi, I noticed this PR was merged without fully following our community’s merging guidelines. Specifically: (a) There was no PR description explaining the purpose or design of the change. (b) Several reviewers (including multiple committers) left unresolved comments, yet none of them gave a formal +1. (c) The PR still lacks an integration test (ITCase). For a new connector like `dynamic-kafka`, we should include a Flink SQL job in the IT suite to validate end-to-end behavior and ensure correctness. Could you kindly resolve my concerns @bowenli86 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli86 merged PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli86 commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3774921720 merging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli86 commented on code in PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2709844507 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java: ## @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; +import org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService; +import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.connector
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli86 commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3774657090 > Is there any ITCase for table and SQL? In kafka sql connector, we add many itcase for read、restore and many case in production. yes, it's covered in DynamicKafkaTableFactoryTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli-oai commented on code in PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2709829889 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java: ## @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; +import org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService; +import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.connect
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli-oai commented on PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3774642344 > Is there any ITCase for table and SQL? In kafka sql connector, we add many itcase for read、restore and many case in production. yes, it's covered in DynamicKafkaTableFactoryTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
bowenli-oai commented on code in PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2709829889 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java: ## @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; +import org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService; +import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.connect
Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]
loserwang1024 commented on code in PR #212: URL: https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2696646769 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java: ## @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; +import org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService; +import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS; +import static org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.conne
