Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2119537106 cc @PatrickRen @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2105596048 Thanks @aiwenmo for reviewing, addressed review comments above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597351018 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ## @@ -221,73 +240,67 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws return tableInfo; } -private void transformSchema(TableId tableId, Schema schema) throws Exception { -for (Tuple4, Optional, Boolean> -transform : transforms) { -Selectors selectors = transform.f0; -if (selectors.isMatch(tableId) && transform.f1.isPresent()) { -TransformProjection transformProjection = transform.f1.get(); +private Schema transformSchema(TableId tableId, Schema schema) throws Exception { +List newSchemas = new ArrayList<>(); +for (PostTransformers transform : transforms) { +Selectors selectors = transform.getSelectors(); +if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { +TransformProjection transformProjection = transform.getProjection().get(); +TransformFilter transformFilter = transform.getFilter().orElse(null); if (transformProjection.isValid()) { if (!transformProjectionProcessorMap.containsKey(transformProjection)) { transformProjectionProcessorMap.put( transformProjection, - TransformProjectionProcessor.of(transformProjection)); +PostTransformProcessor.of(transformProjection, transformFilter)); } -TransformProjectionProcessor transformProjectionProcessor = +PostTransformProcessor postTransformProcessor = transformProjectionProcessorMap.get(transformProjection); // update the columns of projection and add the column of projection into Schema - transformProjectionProcessor.processSchemaChangeEvent(schema); + newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema)); } } } +if (newSchemas.isEmpty()) { +return schema; +} + +Schema firstSchema = newSchemas.get(0); +newSchemas.stream() +.skip(1) +.forEach( +testSchema -> { +if (!testSchema.equals(firstSchema)) { +throw new IllegalArgumentException( +String.format( +"Incompatible transform rules result found. Inferred schema: %s and %s", +firstSchema, testSchema)); +} +}); +return firstSchema; Review Comment: Added `SchemaUtils.mergeCompatibleUpstreamSchema` to resolve this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597350940 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ## @@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String filterExpression) { StringBuilder statement = new StringBuilder(); statement.append("SELECT * FROM "); statement.append(DEFAULT_TABLE); -if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) { +if (!isNullOrWhitespaceOnly(filterExpression)) { statement.append(" WHERE "); statement.append(filterExpression); } return parseSelect(statement.toString()); } + +public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) { +if (sqlNode instanceof SqlBasicCall) { +SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; Review Comment: Replace `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ## @@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String filterExpression) { StringBuilder statement = new StringBuilder(); statement.append("SELECT * FROM "); statement.append(DEFAULT_TABLE); -if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) { +if (!isNullOrWhitespaceOnly(filterExpression)) { statement.append(" WHERE "); statement.append(filterExpression); } return parseSelect(statement.toString()); } + +public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) { +if (sqlNode instanceof SqlBasicCall) { +SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; Review Comment: Check `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
aiwenmo commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597329925 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ## @@ -362,25 +376,51 @@ private Optional processFilter( } private Optional processProjection( -TransformProjectionProcessor transformProjectionProcessor, +PostTransformProcessor postTransformProcessor, DataChangeEvent dataChangeEvent, -long epochTime) -throws Exception { +long epochTime) { BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); if (before != null) { BinaryRecordData projectedBefore = -transformProjectionProcessor.processData(before, epochTime); +postTransformProcessor.processData(before, epochTime); +dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); +} +if (after != null) { +BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime); +dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); +} +return Optional.of(dataChangeEvent); +} + +private Optional processPostProjection( +TableInfo tableInfo, DataChangeEvent dataChangeEvent) throws Exception { +BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); +BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); +if (before != null) { +BinaryRecordData projectedBefore = projectRecord(tableInfo, before); dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); } if (after != null) { -BinaryRecordData projectedAfter = -transformProjectionProcessor.processData(after, epochTime); +BinaryRecordData projectedAfter = projectRecord(tableInfo, after); dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); } return Optional.of(dataChangeEvent); } +private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData recordData) { +List valueList = new ArrayList<>(); +RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters(); + +for (RecordData.FieldGetter fieldGetter : fieldGetters) { +valueList.add(fieldGetter.getFieldOrNull(recordData)); +} + +return tableInfo +.getRecordDataGenerator() +.generate(valueList.toArray(new Object[valueList.size()])); +} + private boolean containFilteredComputedColumn(String projection, String filter) { boolean contain = false; Review Comment: Due to the filter always executing before the projection, this method is no longer useful. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.schema.Selectors; + +import javax.annotation.Nullable; + +import java.util.Optional; + +/** Transformation rules used by {@link PostTransformOperator}. */ +public class PostTransformers { +private final Selectors selectors; + +private final Optional projection; +private final Optional filter; + +private final boolean containFilteredComputedColumn; + Review Comment: Due to the filter always executing before the projection, this private variable is no longer useful. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ## @@ -221,73 +240,67 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws return tableInfo; } -private void transformSchema(TableId tableId, Schema schema) throws
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2097757324 Thanks for @aiwenmo's kindly review, addressed comments above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
aiwenmo commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1591730125 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ## @@ -143,7 +145,10 @@ public static SqlSelect parseSelect(String statement) { // Parse all columns public static List generateProjectionColumns( -String projectionExpression, List columns) { +String projectionExpression, +@Nullable String filterExpression, +List columns, +boolean keepReferencedColumns) { if (StringUtils.isNullOrWhitespaceOnly(projectionExpression)) { Review Comment: Splitting the method into generateProjectionColumns and generateReferencedColumns is more conducive to maintenance. ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java: ## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +import java.util.Collections; + +/** Unit tests for the {@link PreTransformOperator}. */ +public class PreTransformOperatorTest { +private static final TableId CUSTOMERS_TABLEID = +TableId.tableId("my_company", "my_branch", "customers"); +private static final Schema CUSTOMERS_SCHEMA = +Schema.newBuilder() +.physicalColumn("col1", DataTypes.STRING()) +.physicalColumn("col2", DataTypes.STRING()) +.primaryKey("col1") +.build(); +private static final Schema CUSTOMERS_LATEST_SCHEMA = +Schema.newBuilder() +.physicalColumn("col1", DataTypes.STRING()) +.physicalColumn("col2", DataTypes.STRING()) +.physicalColumn("col3", DataTypes.STRING()) +.primaryKey("col1") +.build(); +private static final Schema EXPECT_SCHEMA = +Schema.newBuilder() +.physicalColumn("col1", DataTypes.STRING()) +.physicalColumn("col2", DataTypes.STRING()) +.physicalColumn("col12", DataTypes.STRING()) +.primaryKey("col2") +.partitionKey("col12") +.options(ImmutableMap.of("key1", "value1", "key2", "value2")) +.build(); +private static final Schema EXPECT_LATEST_SCHEMA = +Schema.newBuilder() +.physicalColumn("col1", DataTypes.STRING()) +.physicalColumn("col2", DataTypes.STRING()) +.physicalColumn("col12", DataTypes.STRING()) +.physicalColumn("col3", DataTypes.STRING()) +.primaryKey("col2") +.partitionKey("col12") +.options(ImmutableMap.of("key1", "value1", "key2", "value2")) +.build(); + +private static final Schema NULLABILITY_SCHEMA = +Schema.newBuilder() +.physicalColumn("id", DataTypes.STRING().notNull()) +.physicalColumn("name", DataTypes.STRING()) +.primaryKey("id") +
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2095341052 Updated based on previous comments, cc @aiwenmo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian opened a new pull request, #3285: URL: https://github.com/apache/flink-cdc/pull/3285 This closes [FLINK-35272](https://issues.apache.org/jira/browse/FLINK-35272). Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology: ``` SchemaTransformOp --> DataTransformOp --> SchemaOp ``` where schema projections are applied in `SchemaTransformOp` and data projection & filtering are applied in `DataTransformOp`. The idea is `SchemaTransformOp` might be embedded in `Sources` in the future to reduce payload data size transferred in Flink Job. However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in `DataTransformOp`. See a example as follows: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, upper(name) as newname filter: age > 18 ``` Such transformation rules will fail since `name` and `age` columns are removed in `SchemaTransformOp`, and those data rows could not be retrieved in `DataTransformOp`, where the actual expression evaluation and filtering comes into effect. This PR introduces a new design, renaming the transform topology as follows: ``` PreTransformOp --> PostTransformOp --> SchemaOp ``` where the `PreTransformOp` filters out columns, but only if: * The column is not present in projection rules * The column is not indirectly referenced by calculation and filtering expressions If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like `[id, newname, __PREFIX__name, __PREFIX__age]` will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so a `DataChangeEvent` would be like `[1, null, 'Alice', 19]`. Adding prefix is meant to deal with such cases: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, upper(name) as name filter: age > 18 ``` Here we need to distinguish the calculated column `(new) name` and the referenced original column `(old) name`. So after the name mangling process the schema would be like: `[id, name, __PREFIX__name]`. Also, the filtering process is still done in `PostTransformOp` since user could write down a filter expression that references calculated column, but their value won't be available until `PostTransformOp`'s evaluation. It also means in the following somewhat ambigious case: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, age * 2 as age filter: age > 18 ``` The filtering expression is applied to the calculated `age` column (doubled!) instead of the original one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org