Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
leonardBang merged PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2475352297 > @MOBIN-F Would you like to open a backport PR for `release-3.2` branch? OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
leonardBang commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2475350233 @MOBIN-F Would you like to open a backport PR for `release-3.2` branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2472340148 oceanbase tests failure,but it seems to have nothing to do with this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
aiwenmo commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2469497337 > Thanks for cleaning up `PreTransformOperator`! Looks much neater now, just left some trivial comments. > > Also +@aiwenmo who is familiar with transform modules. Hi. @MOBIN-F. Thanks for the contribution. And I will assist in reviewing the code tonight~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on code in PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1837413936 ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java: ## @@ -526,4 +526,39 @@ void testMetadataTransform() throws Exception { new StreamRecord<>( new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA))); } + +@Test +void testMultiTransform() throws Exception { +PreTransformOperator transform = +PreTransformOperator.newBuilder() Review Comment: done~, also above added some tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
yuxiqian commented on code in PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836241798 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java: ## @@ -386,37 +436,16 @@ private Schema transformSchemaMetaData( return schemaBuilder.build(); } -private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) -throws Exception { -TableId tableId = dataChangeEvent.tableId(); -for (PreTransformer transform : transforms) { -Selectors selectors = transform.getSelectors(); - -if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { -TransformProjection transformProjection = transform.getProjection().get(); -TransformFilter transformFilter = transform.getFilter().orElse(null); -if (transformProjection.isValid()) { -return processProjection(transformProjection, transformFilter, dataChangeEvent); -} -} +private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) { +if (!transforms.isEmpty()) { +return processProjection(dataChangeEvent); } return dataChangeEvent; } Review Comment: Seems the logic is too simple and not worth being an independent method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
yuxiqian commented on code in PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836260088 ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java: ## @@ -526,4 +526,39 @@ void testMetadataTransform() throws Exception { new StreamRecord<>( new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA))); } + +@Test +void testMultiTransform() throws Exception { +PreTransformOperator transform = +PreTransformOperator.newBuilder() Review Comment: Maybe we can also cover cases when projection is `*` or omitted here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
yuxiqian commented on code in PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836254296 ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java: ## @@ -526,4 +526,39 @@ void testMetadataTransform() throws Exception { new StreamRecord<>( new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA))); } + +@Test +void testMultiTransform() throws Exception { +PreTransformOperator transform = +PreTransformOperator.newBuilder() +.addTransform( +CUSTOMERS_TABLEID.identifier(), +"col1, upper(col1) col12", +"col1 = 'col1'", +"col2", +"col12", +"key1=value1,key2=value2") +.addTransform( +CUSTOMERS_TABLEID.identifier(), +"col2, upper(col2) col12", +"col1 != 'col1'", +"col2", +"col12", +"key1=value1,key2=value2") +.build(); Review Comment: Considering this bug is sequential specific, swapping rules to verify it would be nice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
yuxiqian commented on code in PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836206210 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java: ## @@ -342,27 +342,77 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE } } +cachePreTransformProcessor(tableId, createTableEvent.getSchema()); +if (preTransformProcessorMap.containsKey(tableId)) { +return preTransformProcessorMap +.get(tableId) +.preTransformCreateTableEvent(createTableEvent); +} +return createTableEvent; +} + +private void cachePreTransformProcessor(TableId tableId, Schema tableSchema) { +LinkedHashSet referencedColumnsSet = new LinkedHashSet<>(); +boolean hasMatchTransform = false; for (PreTransformer transform : transforms) { -Selectors selectors = transform.getSelectors(); -if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { +if (!transform.getSelectors().isMatch(tableId)) { +continue; +} +if (!transform.getProjection().isPresent()) { +processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); +hasMatchTransform = true; +} else { Review Comment: Is it correct if we omit this branch and let it fallback to `if(!hasMatchTransform)` condition? Seems the handling logic is the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
yuxiqian commented on code in PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836206210 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java: ## @@ -342,27 +342,77 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE } } +cachePreTransformProcessor(tableId, createTableEvent.getSchema()); +if (preTransformProcessorMap.containsKey(tableId)) { +return preTransformProcessorMap +.get(tableId) +.preTransformCreateTableEvent(createTableEvent); +} +return createTableEvent; +} + +private void cachePreTransformProcessor(TableId tableId, Schema tableSchema) { +LinkedHashSet referencedColumnsSet = new LinkedHashSet<>(); +boolean hasMatchTransform = false; for (PreTransformer transform : transforms) { -Selectors selectors = transform.getSelectors(); -if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { +if (!transform.getSelectors().isMatch(tableId)) { +continue; +} +if (!transform.getProjection().isPresent()) { +processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); +hasMatchTransform = true; +} else { Review Comment: Is it correct if we omit this branch and let it fallback to `if(!hasMatchTransform)` condition? Seems the handling logic is the same. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java: ## @@ -342,27 +342,77 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE } } +cachePreTransformProcessor(tableId, createTableEvent.getSchema()); +if (preTransformProcessorMap.containsKey(tableId)) { +return preTransformProcessorMap +.get(tableId) +.preTransformCreateTableEvent(createTableEvent); +} +return createTableEvent; +} + +private void cachePreTransformProcessor(TableId tableId, Schema tableSchema) { +LinkedHashSet referencedColumnsSet = new LinkedHashSet<>(); +boolean hasMatchTransform = false; for (PreTransformer transform : transforms) { -Selectors selectors = transform.getSelectors(); -if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { +if (!transform.getSelectors().isMatch(tableId)) { +continue; +} +if (!transform.getProjection().isPresent()) { +processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); +hasMatchTransform = true; +} else { TransformProjection transformProjection = transform.getProjection().get(); -TransformFilter transformFilter = transform.getFilter().orElse(null); if (transformProjection.isValid()) { -if (!preTransformProcessorMap.containsKey(tableId)) { -preTransformProcessorMap.put( -tableId, -new PreTransformProcessor( -tableChangeInfo, transformProjection, transformFilter)); -} -PreTransformProcessor preTransformProcessor = -preTransformProcessorMap.get(tableId); -// TODO: Currently this code wrongly filters out rows that weren't referenced in -// the first matching transform rule but in the following transform rules. -return preTransformProcessor.preTransformCreateTableEvent(createTableEvent); +processProjectionTransform( +tableId, tableSchema, referencedColumnsSet, transform); +hasMatchTransform = true; } } } -return createTableEvent; +if (!hasMatchTransform) { +processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); +} +} + +public void processProjectionTransform( +TableId tableId, +Schema tableSchema, +LinkedHashSet referencedColumnsSet, +@Nullable PreTransformer transform) { +// If this TableId isn't presented in any transform block, it should behave like a "*" +// projection and should be regarded as asterisk-ful. +if (transform == null) { +referencedColumnsSet.addAll(tableSchema.getColumns()); +hasAsteriskMap.put(tableId, true); +} else { +TransformProjection transformProjection = transform.getProjection().get(); +
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2467581975 I previously ignored the scenario where projection was missing. I fixed it and added integrated / E2e tests , Can you review it again? @yuxiqian In my CI Aaction,the tests passed ![image](https://github.com/user-attachments/assets/7dc9b443-4384-4d57-ba2c-f643eba0cee6) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463886561 CI failure,I will recheck the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463867357 > @MOBIN-F Could you kindly rebase again? done~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
leonardBang commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463861429 @MOBIN-F Could you kindly rebase again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2463611452 CI failure,It seems that this PR(https://github.com/apache/flink-cdc/pull/3696) should be merged first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
MOBIN-F commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2461897844 > @MOBIN-F Could you help rebase this PR? @leonardBang done~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
leonardBang commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2461825121 @MOBIN-F Could you help rebase this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36093][transform] fix preTransformoperator wrongly filters columns when multiple transform [flink-cdc]
github-actions[bot] commented on PR #3572: URL: https://github.com/apache/flink-cdc/pull/3572#issuecomment-2436543491 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org