this link has describe the usage for [Scan Newly Added Tables]
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables
.
if we can use if without restarting job. i have try this patch, use a
schedule task in MysqlSnapshotSplitAssigner#open(), when added table more
than twice, it occur this issue
https://github.com/apache/flink-cdc/issues/2282


 
.../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
                 | 26 +++++++++++++++++++++++---
 
.../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
 |  5 +++--
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --cc
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav
index 0536a262,0536a262..d52acc26
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@@ -56,7 -56,7 +56,9 @@@ import java.util.Set
  import java.util.concurrent.CopyOnWriteArrayList;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.TimeUnit;
  import java.util.stream.Collectors;

  import static
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
@@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne
      private MySqlChunkSplitter chunkSplitter;
      private boolean isTableIdCaseSensitive;
      private ExecutorService executor;
++    private ScheduledExecutorService scheduledExecutor;

      @Nullable private Long checkpointIdToFinish;

@@@ -179,12 -179,12 +182,24 @@@
      @Override
      public void open() {
          chunkSplitter.open();
--        discoveryCaptureTables();
--        captureNewlyAddedTables();
--        startAsynchronouslySplit();
++        if (scheduledExecutor == null) {
++            ThreadFactory threadFactory =
++                    new
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
++            this.scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(threadFactory);
++        }
++        scheduledExecutor.scheduleAtFixedRate(
++                () -> {
++                    discoveryCaptureTables();
++                    captureNewlyAddedTables();
++                    startAsynchronouslySplit();
++                },
++                0,
++                1,
++                TimeUnit.MINUTES);
      }

      private void discoveryCaptureTables() {
++        LOG.info("start discovery capture tables");
          // discovery the tables lazily
          if (needToDiscoveryTables()) {
              long start = System.currentTimeMillis();
@@@ -216,6 -216,6 +231,7 @@@
      }

      private void captureNewlyAddedTables() {
++        LOG.info("start to capture newly added tables");
          if (sourceConfig.isScanNewlyAddedTableEnabled()) {
              // check whether we got newly added tables
              try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
@@@ -282,6 -282,6 +298,7 @@@
      }

      private void startAsynchronouslySplit() {
++        LOG.info("start asynchronously split");
          if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
              if (executor == null) {
                  ThreadFactory threadFactory =
@@@ -497,6 -497,6 +514,9 @@@
          if (executor != null) {
              executor.shutdown();
          }
++        if (scheduledExecutor != null) {
++            scheduledExecutor.shutdown();
++        }
      }

Reply via email to