Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-13 Thread via GitHub


leonardBang merged PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-13 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2475352297

   > @MOBIN-F Would you like to open a backport PR for `release-3.2` branch?
   
   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-13 Thread via GitHub


leonardBang commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2475350233

   @MOBIN-F Would you like to open a backport PR for `release-3.2` branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-12 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2472340148

   oceanbase tests failure,but it seems to have nothing to do with this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


aiwenmo commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2469497337

   > Thanks for cleaning up `PreTransformOperator`! Looks much neater now, just 
left some trivial comments.
   > 
   > Also +@aiwenmo who is familiar with transform modules.
   
   Hi. @MOBIN-F. Thanks for the contribution. And I will assist in reviewing 
the code tonight~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


MOBIN-F commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1837413936


##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java:
##
@@ -526,4 +526,39 @@ void testMetadataTransform() throws Exception {
 new StreamRecord<>(
 new CreateTableEvent(METADATA_TABLEID, 
EXPECTED_METADATA_SCHEMA)));
 }
+
+@Test
+void testMultiTransform() throws Exception {
+PreTransformOperator transform =
+PreTransformOperator.newBuilder()

Review Comment:
   done~, also above
   added some tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836241798


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##
@@ -386,37 +436,16 @@ private Schema transformSchemaMetaData(
 return schemaBuilder.build();
 }
 
-private DataChangeEvent processDataChangeEvent(DataChangeEvent 
dataChangeEvent)
-throws Exception {
-TableId tableId = dataChangeEvent.tableId();
-for (PreTransformer transform : transforms) {
-Selectors selectors = transform.getSelectors();
-
-if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
-TransformProjection transformProjection = 
transform.getProjection().get();
-TransformFilter transformFilter = 
transform.getFilter().orElse(null);
-if (transformProjection.isValid()) {
-return processProjection(transformProjection, 
transformFilter, dataChangeEvent);
-}
-}
+private DataChangeEvent processDataChangeEvent(DataChangeEvent 
dataChangeEvent) {
+if (!transforms.isEmpty()) {
+return processProjection(dataChangeEvent);
 }
 return dataChangeEvent;
 }

Review Comment:
   Seems the logic is too simple and not worth being an independent method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836260088


##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java:
##
@@ -526,4 +526,39 @@ void testMetadataTransform() throws Exception {
 new StreamRecord<>(
 new CreateTableEvent(METADATA_TABLEID, 
EXPECTED_METADATA_SCHEMA)));
 }
+
+@Test
+void testMultiTransform() throws Exception {
+PreTransformOperator transform =
+PreTransformOperator.newBuilder()

Review Comment:
   Maybe we can also cover cases when projection is `*` or omitted here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836254296


##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java:
##
@@ -526,4 +526,39 @@ void testMetadataTransform() throws Exception {
 new StreamRecord<>(
 new CreateTableEvent(METADATA_TABLEID, 
EXPECTED_METADATA_SCHEMA)));
 }
+
+@Test
+void testMultiTransform() throws Exception {
+PreTransformOperator transform =
+PreTransformOperator.newBuilder()
+.addTransform(
+CUSTOMERS_TABLEID.identifier(),
+"col1, upper(col1) col12",
+"col1 = 'col1'",
+"col2",
+"col12",
+"key1=value1,key2=value2")
+.addTransform(
+CUSTOMERS_TABLEID.identifier(),
+"col2, upper(col2) col12",
+"col1 != 'col1'",
+"col2",
+"col12",
+"key1=value1,key2=value2")
+.build();

Review Comment:
   Considering this bug is sequential specific, swapping rules to verify it 
would be nice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836206210


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##
@@ -342,27 +342,77 @@ private CreateTableEvent 
transformCreateTableEvent(CreateTableEvent createTableE
 }
 }
 
+cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+if (preTransformProcessorMap.containsKey(tableId)) {
+return preTransformProcessorMap
+.get(tableId)
+.preTransformCreateTableEvent(createTableEvent);
+}
+return createTableEvent;
+}
+
+private void cachePreTransformProcessor(TableId tableId, Schema 
tableSchema) {
+LinkedHashSet referencedColumnsSet = new LinkedHashSet<>();
+boolean hasMatchTransform = false;
 for (PreTransformer transform : transforms) {
-Selectors selectors = transform.getSelectors();
-if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+if (!transform.getSelectors().isMatch(tableId)) {
+continue;
+}
+if (!transform.getProjection().isPresent()) {
+processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+hasMatchTransform = true;
+} else {

Review Comment:
   Is it correct if we omit this branch and let it fallback to 
`if(!hasMatchTransform)` condition? Seems the handling logic is the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836206210


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##
@@ -342,27 +342,77 @@ private CreateTableEvent 
transformCreateTableEvent(CreateTableEvent createTableE
 }
 }
 
+cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+if (preTransformProcessorMap.containsKey(tableId)) {
+return preTransformProcessorMap
+.get(tableId)
+.preTransformCreateTableEvent(createTableEvent);
+}
+return createTableEvent;
+}
+
+private void cachePreTransformProcessor(TableId tableId, Schema 
tableSchema) {
+LinkedHashSet referencedColumnsSet = new LinkedHashSet<>();
+boolean hasMatchTransform = false;
 for (PreTransformer transform : transforms) {
-Selectors selectors = transform.getSelectors();
-if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+if (!transform.getSelectors().isMatch(tableId)) {
+continue;
+}
+if (!transform.getProjection().isPresent()) {
+processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+hasMatchTransform = true;
+} else {

Review Comment:
   Is it correct if we omit this branch and let it fallback to 
`if(!hasMatchTransform)` condition? Seems the handling logic is the same.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##
@@ -342,27 +342,77 @@ private CreateTableEvent 
transformCreateTableEvent(CreateTableEvent createTableE
 }
 }
 
+cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+if (preTransformProcessorMap.containsKey(tableId)) {
+return preTransformProcessorMap
+.get(tableId)
+.preTransformCreateTableEvent(createTableEvent);
+}
+return createTableEvent;
+}
+
+private void cachePreTransformProcessor(TableId tableId, Schema 
tableSchema) {
+LinkedHashSet referencedColumnsSet = new LinkedHashSet<>();
+boolean hasMatchTransform = false;
 for (PreTransformer transform : transforms) {
-Selectors selectors = transform.getSelectors();
-if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+if (!transform.getSelectors().isMatch(tableId)) {
+continue;
+}
+if (!transform.getProjection().isPresent()) {
+processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+hasMatchTransform = true;
+} else {
 TransformProjection transformProjection = 
transform.getProjection().get();
-TransformFilter transformFilter = 
transform.getFilter().orElse(null);
 if (transformProjection.isValid()) {
-if (!preTransformProcessorMap.containsKey(tableId)) {
-preTransformProcessorMap.put(
-tableId,
-new PreTransformProcessor(
-tableChangeInfo, transformProjection, 
transformFilter));
-}
-PreTransformProcessor preTransformProcessor =
-preTransformProcessorMap.get(tableId);
-// TODO: Currently this code wrongly filters out rows that 
weren't referenced in
-// the first matching transform rule but in the following 
transform rules.
-return 
preTransformProcessor.preTransformCreateTableEvent(createTableEvent);
+processProjectionTransform(
+tableId, tableSchema, referencedColumnsSet, 
transform);
+hasMatchTransform = true;
 }
 }
 }
-return createTableEvent;
+if (!hasMatchTransform) {
+processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+}
+}
+
+public void processProjectionTransform(
+TableId tableId,
+Schema tableSchema,
+LinkedHashSet referencedColumnsSet,
+@Nullable PreTransformer transform) {
+// If this TableId isn't presented in any transform block, it should 
behave like a "*"
+// projection and should be regarded as asterisk-ful.
+if (transform == null) {
+referencedColumnsSet.addAll(tableSchema.getColumns());
+hasAsteriskMap.put(tableId, true);
+} else {
+TransformProjection transformProjection = 
transform.getProjection().get();
+  

Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-11 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2467581975

   I previously ignored the scenario where projection was missing. I fixed it 
and added  integrated / E2e tests , Can you review it again? @yuxiqian 
   In my CI Aaction,the tests passed
   
![image](https://github.com/user-attachments/assets/7dc9b443-4384-4d57-ba2c-f643eba0cee6)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-07 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463886561

   CI failure,I will recheck the code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-07 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463867357

   > @MOBIN-F Could you kindly rebase again?
   
   done~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-07 Thread via GitHub


leonardBang commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463861429

   @MOBIN-F Could you kindly rebase again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-07 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463611452

   CI failure,It seems that this 
PR(https://github.com/apache/flink-cdc/pull/3696) should be merged first


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-07 Thread via GitHub


MOBIN-F commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2461897844

   > @MOBIN-F Could you help rebase this PR?
   @leonardBang done~
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-11-07 Thread via GitHub


leonardBang commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2461825121

   @MOBIN-F Could you help rebase this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]

2024-10-24 Thread via GitHub


github-actions[bot] commented on PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2436543491

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org