Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-03-08 Thread via GitHub


leonardBang commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-4020682305

   
   > I agree this PR was merged too quickly, and could have waited a bit 
longer. The IT has been added in 
[cdbd635](https://github.com/apache/flink-connector-kafka/commit/cdbd635171fc4322ba7182eb93e920472d6d9d91).
   
   Thanks for adding an IT in a later PR, this makes the change look better.
   
   
   > For the other points, my understanding was:
   
   > * I was not aware of a requirement that non-architectural changes must be 
approved by another committer. In the past, I have seen non-committer approvals 
accepted for this type of change. For this PR, I also requested reviews from a 
couple committers based on activity at that time, but did not receive a 
response. I'm happy to include you or anyone else as reviewers in future if you 
can commit to timely reviewing them
   
   Flink community does not require non-architectural changes must be approved 
by another committer, but we suggested[1]. In this case, committer 
loserwang1024 comment that we need an **IT** case, but you reply that we've a 
**UT** case and merge the PR directly, That's why I point out this issue. 
Imaging that you're review other committer's PR and left one comment without 
resolved reply, and he/she merge the PR directly, what's your feeling for the 
community process?  I think it should be a consensus  that we should resolve 
other committers' comment if your PR is reviewing by them and other committer 
left reasonable comment.
   
   [1] Flink Committer Invitation Email template: Being a committer enables you 
to make changes without relying on other committers to commit your changes. 
However, we encourage you to keep the same approach to contributions and keep 
working with peer reviews. Think of the committer status as a recognition of 
your work in the community, that also gives you the ability to now execute the 
final step of a contribution (pushing to the Flink repositories) by yourself.

   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-03-07 Thread via GitHub


bowenli86 commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-4017489772

   Hi @leonardBang @ferenc-csaky, thanks for raising this.
   
   I agree this PR was merged too quickly, and could have waited a bit longer. 
The IT has been added in 
https://github.com/apache/flink-connector-kafka/commit/cdbd635171fc4322ba7182eb93e920472d6d9d91.
   
   For the other points, my understanding was:
   
   - For straightforward changes, the PR title and linked JIRA have generally 
been treated as sufficient context. For example, commits such as 
https://github.com/apache/flink-connector-kafka/commit/20ca57d7efdcca52b5ce44afbccc1e7ad66ff82b,
 
https://github.com/apache/flink-connector-kafka/commit/f2e6b34ebfd7f6d91874cd8ee9b9563b22fa398d,
 
https://github.com/apache/flink-connector-kafka/commit/a42f6a42928bba801bc75897470562debe4c50c4,
 
https://github.com/apache/flink-connector-kafka/commit/e0969a4ba7b5e23500a68b5aea5824fffe4ed0fa
 and many more that do not include additional descriptions either. For more 
involved changes, I agree that a fuller description is beneficial, and that is 
what I have tried to provide in those cases
   
   - I was not aware of a requirement that non-architectural changes must be 
approved by another committer. In the past, I have seen non-committer approvals 
accepted for this type of change. For this PR, I also requested reviews from a 
couple committers based on activity at that time, but did not receive a 
response. I'm happy to include you or anyone else as reviewers in future if you 
can commit to timely reviewing them
   
   - This was not a blind approval. The JIRA and PR describe the motivation and 
proposed solution in details, comments were addressed (lots are done offline), 
and the change passed CI.
   
   It has been some time since I last committed, if the project’s routines or 
expectations have changed, please point me to the current standard. I fully 
support improving the process, e.g. if we want stricter requirements such as 
mandatory descriptions for all commits, that should be applied consistently to 
all commits across the repo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-03-06 Thread via GitHub


ferenc-csaky commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-4011560886

   @bowenli86 I agree with @leonardBang, and you have 3-4 PRs recently that has 
no or minimal context, 1 supposedly blind approve from a non-committer and then 
merged. I won't start to revert these, as after a quick look they do not seem 
problematic to me (I could be wrong, I did not spend too much time checking), 
but this is not how we should operate. It would be helpful to address the 
things Leonard mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


leonardBang commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3775954183

   > Is there any ITCase for table and SQL
   
   Hi, I noticed this PR was merged without fully following our community’s 
merging guidelines. Specifically:
   (a) There was no PR description explaining the purpose or design of the 
change.
 (b) Several reviewers (including multiple committers) left unresolved 
comments, yet none of them gave a formal +1.
 (c) The PR still lacks an integration test (ITCase). For a new connector 
like `dynamic-kafka`, we should include a Flink SQL job in the IT suite to 
validate end-to-end behavior and ensure correctness.
 
   Could you kindly resolve my concerns @bowenli86 ?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli86 merged PR #212:
URL: https://github.com/apache/flink-connector-kafka/pull/212


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli86 commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3774921720

   merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli86 commented on code in PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2709844507


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java:
##
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.connector

Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli86 commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3774657090

   > Is there any ITCase for table and SQL? In kafka sql connector, we add many 
itcase for read、restore and many case in production.
   
   yes, it's covered in DynamicKafkaTableFactoryTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli-oai commented on code in PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2709829889


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java:
##
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.connect

Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli-oai commented on PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#issuecomment-3774642344

   > Is there any ITCase for table and SQL? In kafka sql connector, we add many 
itcase for read、restore and many case in production.
   yes, it's covered in DynamicKafkaTableFactoryTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-20 Thread via GitHub


bowenli-oai commented on code in PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2709829889


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java:
##
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.connect

Re: [PR] [FLINK-38920] create Table source and factory for DynamicKafkaSource in Flink Table API [flink-connector-kafka]

2026-01-15 Thread via GitHub


loserwang1024 commented on code in PR #212:
URL: 
https://github.com/apache/flink-connector-kafka/pull/212#discussion_r2696646769


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java:
##
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.conne