Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-05-09 Thread via GitHub


chengcongchina commented on code in PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#discussion_r3212675546


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##
@@ -1350,6 +1363,15 @@ protected void initSSLContext(SSLContext sc) throws 
GeneralSecurityException {
 return null;
 }
 
+private TableIdFilter getTableIdDeserializationFilter() {
+return tableId -> {
+// since only subscribed table is recording schema, the result 
could be null
+TableId table = taskContext.getSchema().getTableId(tableId);
+return table != null
+&& 
connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table);

Review Comment:
   @lvyanquan Good point. With the current Debezium MySqlDatabaseSchema 
behavior, `taskContext.getSchema().getTableId(tableNumber)` returns non-null 
only when the
 table mapping was assigned for a known table schema, so `table != null` is 
effectively enough to exclude unsubscribed tables today.
   That said, I’m keeping the additional 
`connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table)` 
check intentionally as a defensive guard. The `getTableId(tableNumber)` API 
itself doesn’t formally guarantee that null always means 
“unsubscribed/unmatched”, it’s just how the current implementation behaves (it 
depends on how schema/history is populated). If schema/history handling changes 
in the future (e.g., more tables’ schemas are present/loaded than the 
subscribed set), getTableId() could become non-null for tables we still don’t 
want to deserialize. The explicit `isIncluded(table)` keeps the intent (“only 
deserialize captured tables”) robust against such evolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-05-09 Thread via GitHub


chengcongchina commented on code in PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#discussion_r3212675546


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##
@@ -1350,6 +1363,15 @@ protected void initSSLContext(SSLContext sc) throws 
GeneralSecurityException {
 return null;
 }
 
+private TableIdFilter getTableIdDeserializationFilter() {
+return tableId -> {
+// since only subscribed table is recording schema, the result 
could be null
+TableId table = taskContext.getSchema().getTableId(tableId);
+return table != null
+&& 
connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table);

Review Comment:
   @lvyanquan Good point. With the current Debezium MySqlDatabaseSchema 
behavior, `taskContext.getSchema().getTableId(tableNumber)` returns non-null 
only when the
 table mapping was assigned for a known table schema, so `table != null` is 
effectively enough to exclude unsubscribed tables today.
   That said, I’m keeping the additional 
`connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table)` 
check intentionally as a defensive
 guard. The `getTableId(tableNumber)` API itself doesn’t formally guarantee 
that null always means “unsubscribed/unmatched”, it’s just how the current
 implementation behaves (it depends on how schema/history is populated). If 
schema/history handling changes in the future (e.g., more tables’ schemas
 are present/loaded than the subscribed set), getTableId() could become 
non-null for tables we still don’t want to deserialize. The explicit
 isIncluded(table) keeps the intent (“only deserialize captured tables”) 
robust against such evolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-05-09 Thread via GitHub


chengcongchina commented on code in PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#discussion_r3212675546


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##
@@ -1350,6 +1363,15 @@ protected void initSSLContext(SSLContext sc) throws 
GeneralSecurityException {
 return null;
 }
 
+private TableIdFilter getTableIdDeserializationFilter() {
+return tableId -> {
+// since only subscribed table is recording schema, the result 
could be null
+TableId table = taskContext.getSchema().getTableId(tableId);
+return table != null
+&& 
connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table);

Review Comment:
   @lvyanquan Good point. With the current Debezium MySqlDatabaseSchema 
behavior, `taskContext.getSchema().getTableId(tableNumber)` returns non-null 
only when the
 table mapping was assigned for a known table schema, so `table != null` is 
effectively enough to exclude unsubscribed tables today.
   
   That said, I’m keeping the additional 
`connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table)` 
check intentionally as a defensive
 guard. The `getTableId(tableNumber)` API itself doesn’t formally guarantee 
that null always means “unsubscribed/unmatched”, it’s just how the current
 implementation behaves (it depends on how schema/history is populated). If 
schema/history handling changes in the future (e.g., more tables’ schemas
 are present/loaded than the subscribed set), getTableId() could become 
non-null for tables we still don’t want to deserialize. The explicit
 isIncluded(table) keeps the intent (“only deserialize captured tables”) 
robust against such evolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-31 Thread via GitHub


lvyanquan commented on code in PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#discussion_r3014251314


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##
@@ -1350,6 +1363,15 @@ protected void initSSLContext(SSLContext sc) throws 
GeneralSecurityException {
 return null;
 }
 
+private TableIdFilter getTableIdDeserializationFilter() {
+return tableId -> {
+// since only subscribed table is recording schema, the result 
could be null
+TableId table = taskContext.getSchema().getTableId(tableId);
+return table != null
+&& 
connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table);

Review Comment:
   I'm wondering if this filter is necessary. The validation should have 
already been done in 
[MySqlStreamingChangeEventSource#handleUpdateTableMetadata](https://github.com/apache/flink-cdc/blob/ec244f195b2f7f7f7dc63da4514780b015df0424/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java#L726).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-27 Thread via GitHub


chengcongchina commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4146499902

   I’ve added a configuration option to keep this optimization disabled by 
default. Users can choose to enable it manually, and I’ve also updated the 
corresponding documentation. Could @lvyanquan @ThorneANN please take another 
look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-27 Thread via GitHub


ThorneANN commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4141205901

   > After discussing with @lvyanquan , we agreed not to enable this 
optimization feature too aggressively right away. Instead, we will introduce a 
configuration option that is disabled by default, and turn it on in a future 
release after validating it through real-world usage.
   
   May be and i agree with this point


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-27 Thread via GitHub


chengcongchina commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4141196727

   After discussing with @lvyanquan , we agreed not to enable this optimization 
feature too aggressively right away. Instead, we will introduce a configuration 
option that is disabled by default, and turn it on in a future release after 
validating it through real-world usage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-27 Thread via GitHub


chengcongchina commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4141185949

   > This pr i edit for last month ,u can cc 
.https://github.com/ThorneANN/flink-cdc/tree/serrizer-only-tablels
   
   During binlog parsing, filtering out the parsing of fields for irrelevant 
tables in TableMapEventMetadata is indeed a great optimization opportunity. 
However, doing only that is not enough: if we don’t also apply filtering during 
the subsequent parsing of insert/update/delete row change events, parsing can 
still fail because the metadata column types are actually needed there. 
Therefore, we can first consider merging the parsing optimization in the 
current PR, and then continue optimizing TableMapEventMetadata afterward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-18 Thread via GitHub


chengcongchina commented on code in PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#discussion_r2958190882


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
+ *
+ * Line 65-70: Use a {@link TableIdFilter} to skip the binlog 
deserialization of unwanted tables.
+ */
+public class DeleteRowsEventDataDeserializer
+extends AbstractRowsEventDataDeserializer {
+
+private boolean mayContainExtraInformation;
+
+/** the table id filter to skip further deserialization of unsubscribed 
table ids. */
+private TableIdFilter tableIdFilter = TableIdFilter.all();
+
+public DeleteRowsEventDataDeserializer(Map 
tableMapEventByTableId) {
+super(tableMapEventByTableId);
+}
+
+public DeleteRowsEventDataDeserializer setMayContainExtraInformation(
+boolean mayContainExtraInformation) {
+this.mayContainExtraInformation = mayContainExtraInformation;
+return this;
+}
+
+public DeleteRowsEventDataDeserializer setTableIdFilter(TableIdFilter 
tableIdFilter) {

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-10 Thread via GitHub


ThorneANN commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4029838860

   This pr i edit for last month ,u can cc 
.https://github.com/ThorneANN/flink-cdc/tree/serrizer-only-tablels


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-10 Thread via GitHub


ThorneANN commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4029833108

   ```
eventDeserializer.setEventDataDeserializer(
   EventType.TABLE_MAP,
   new com.github.shyiko.mysql.binlog.event.deserialization
   .TableMapEventDataDeserializer(
   
connectorConfig.getTableFilters().dataCollectionFilter()::isIncluded));
   ```
   
   As the code show , flink-cdc also skip the data without capture table ,but 
always deserializer dataEvent's  mata 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-10 Thread via GitHub


ThorneANN commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4029777927

   May be we modify `TableMapEventDataDeserializer` and 
`MySqlStreamingChangeEventSource` 's classes  to achieve this feature
   ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-10 Thread via GitHub


ThorneANN commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4029733475

   > @ThorneANN Thank you for your comment. In the MySQL binlog protocol, apart 
from events like GTID events, there are mainly two types of events related to 
table names: DDL changes and row data changes (INSERT/UPDATE/DELETE). 
Currently, DDL changes are captured for all tables, partly because downstream 
DDL capture for online DDL tools may require DDL parsing for shadow tables. 
Aside from that, the main deserialization workload in the binlog client lies in 
parsing row data changes. Therefore, adding the table ID filtering logic 
specifically within the deserialization of these row change events is 
sufficient to achieve the optimization.
   
   Yes, the point that confuses me is that I noticed you added a lot of event 
serialization classes because I think you only need to filter tableids


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-10 Thread via GitHub


chengcongchina commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4029681200

   @ThorneANN Thank you for your comment.
   In the MySQL binlog protocol, apart from events like GTID events, there are 
mainly two types of events related to table names: DDL changes and row data 
changes (INSERT/UPDATE/DELETE).
   Currently, DDL changes are captured for all tables, partly because 
downstream DDL capture for online DDL tools may require DDL parsing for shadow 
tables. Aside from that, the main deserialization workload in the binlog client 
lies in parsing row data changes. Therefore, adding the table ID filtering 
logic specifically within the deserialization of these row change events is 
sufficient to achieve the optimization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client. [flink-cdc]

2026-03-08 Thread via GitHub


ThorneANN commented on PR #4302:
URL: https://github.com/apache/flink-cdc/pull/4302#issuecomment-4020701458

   Why  not filter tableId instand of event actions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]