this link has describe the usage for [Scan Newly Added Tables] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables . if we can use if without restarting job. i have try this patch, use a schedule task in MysqlSnapshotSplitAssigner#open(), when added table more than twice, it occur this issue https://github.com/apache/flink-cdc/issues/2282
.../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java | 26 +++++++++++++++++++++++--- .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java | 5 +++-- 2 files changed, 26 insertions(+), 5 deletions(-) diff --cc flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav index 0536a262,0536a262..d52acc26 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; ++import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; ++import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne private MySqlChunkSplitter chunkSplitter; private boolean isTableIdCaseSensitive; private ExecutorService executor; ++ private ScheduledExecutorService scheduledExecutor; @Nullable private Long checkpointIdToFinish; @@@ -179,12 -179,12 +182,24 @@@ @Override public void open() { chunkSplitter.open(); -- discoveryCaptureTables(); -- captureNewlyAddedTables(); -- startAsynchronouslySplit(); ++ if (scheduledExecutor == null) { ++ ThreadFactory threadFactory = ++ new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build(); ++ this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); ++ } ++ scheduledExecutor.scheduleAtFixedRate( ++ () -> { ++ discoveryCaptureTables(); ++ captureNewlyAddedTables(); ++ startAsynchronouslySplit(); ++ }, ++ 0, ++ 1, ++ TimeUnit.MINUTES); } private void discoveryCaptureTables() { ++ LOG.info("start discovery capture tables"); // discovery the tables lazily if (needToDiscoveryTables()) { long start = System.currentTimeMillis(); @@@ -216,6 -216,6 +231,7 @@@ } private void captureNewlyAddedTables() { ++ LOG.info("start to capture newly added tables"); if (sourceConfig.isScanNewlyAddedTableEnabled()) { // check whether we got newly added tables try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { @@@ -282,6 -282,6 +298,7 @@@ } private void startAsynchronouslySplit() { ++ LOG.info("start asynchronously split"); if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) { if (executor == null) { ThreadFactory threadFactory = @@@ -497,6 -497,6 +514,9 @@@ if (executor != null) { executor.shutdown(); } ++ if (scheduledExecutor != null) { ++ scheduledExecutor.shutdown(); ++ } }