Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-19 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2119537106

   cc @PatrickRen @lvyanquan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-11 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2105596048

   Thanks @aiwenmo for reviewing, addressed review comments above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597351018


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##
@@ -221,73 +240,67 @@ private TableInfo 
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
 return tableInfo;
 }
 
-private void transformSchema(TableId tableId, Schema schema) throws 
Exception {
-for (Tuple4, 
Optional, Boolean>
-transform : transforms) {
-Selectors selectors = transform.f0;
-if (selectors.isMatch(tableId) && transform.f1.isPresent()) {
-TransformProjection transformProjection = transform.f1.get();
+private Schema transformSchema(TableId tableId, Schema schema) throws 
Exception {
+List newSchemas = new ArrayList<>();
+for (PostTransformers transform : transforms) {
+Selectors selectors = transform.getSelectors();
+if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+TransformProjection transformProjection = 
transform.getProjection().get();
+TransformFilter transformFilter = 
transform.getFilter().orElse(null);
 if (transformProjection.isValid()) {
 if 
(!transformProjectionProcessorMap.containsKey(transformProjection)) {
 transformProjectionProcessorMap.put(
 transformProjection,
-
TransformProjectionProcessor.of(transformProjection));
+PostTransformProcessor.of(transformProjection, 
transformFilter));
 }
-TransformProjectionProcessor transformProjectionProcessor =
+PostTransformProcessor postTransformProcessor =
 
transformProjectionProcessorMap.get(transformProjection);
 // update the columns of projection and add the column of 
projection into Schema
-
transformProjectionProcessor.processSchemaChangeEvent(schema);
+
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
 }
 }
 }
+if (newSchemas.isEmpty()) {
+return schema;
+}
+
+Schema firstSchema = newSchemas.get(0);
+newSchemas.stream()
+.skip(1)
+.forEach(
+testSchema -> {
+if (!testSchema.equals(firstSchema)) {
+throw new IllegalArgumentException(
+String.format(
+"Incompatible transform rules 
result found. Inferred schema: %s and %s",
+firstSchema, testSchema));
+}
+});
+return firstSchema;

Review Comment:
   Added `SchemaUtils.mergeCompatibleUpstreamSchema` to resolve this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597350940


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##
@@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String 
filterExpression) {
 StringBuilder statement = new StringBuilder();
 statement.append("SELECT * FROM ");
 statement.append(DEFAULT_TABLE);
-if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+if (!isNullOrWhitespaceOnly(filterExpression)) {
 statement.append(" WHERE ");
 statement.append(filterExpression);
 }
 return parseSelect(statement.toString());
 }
+
+public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) {
+if (sqlNode instanceof SqlBasicCall) {
+SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;

Review Comment:
   Replace `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##
@@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String 
filterExpression) {
 StringBuilder statement = new StringBuilder();
 statement.append("SELECT * FROM ");
 statement.append(DEFAULT_TABLE);
-if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+if (!isNullOrWhitespaceOnly(filterExpression)) {
 statement.append(" WHERE ");
 statement.append(filterExpression);
 }
 return parseSelect(statement.toString());
 }
+
+public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) {
+if (sqlNode instanceof SqlBasicCall) {
+SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;

Review Comment:
   Check `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


aiwenmo commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597329925


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##
@@ -362,25 +376,51 @@ private Optional processFilter(
 }
 
 private Optional processProjection(
-TransformProjectionProcessor transformProjectionProcessor,
+PostTransformProcessor postTransformProcessor,
 DataChangeEvent dataChangeEvent,
-long epochTime)
-throws Exception {
+long epochTime) {
 BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
 BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
 if (before != null) {
 BinaryRecordData projectedBefore =
-transformProjectionProcessor.processData(before, 
epochTime);
+postTransformProcessor.processData(before, epochTime);
+dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
+}
+if (after != null) {
+BinaryRecordData projectedAfter = 
postTransformProcessor.processData(after, epochTime);
+dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
+}
+return Optional.of(dataChangeEvent);
+}
+
+private Optional processPostProjection(
+TableInfo tableInfo, DataChangeEvent dataChangeEvent) throws 
Exception {
+BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
+BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
+if (before != null) {
+BinaryRecordData projectedBefore = projectRecord(tableInfo, 
before);
 dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
 }
 if (after != null) {
-BinaryRecordData projectedAfter =
-transformProjectionProcessor.processData(after, epochTime);
+BinaryRecordData projectedAfter = projectRecord(tableInfo, after);
 dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
 }
 return Optional.of(dataChangeEvent);
 }
 
+private BinaryRecordData projectRecord(TableInfo tableInfo, 
BinaryRecordData recordData) {
+List valueList = new ArrayList<>();
+RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
+
+for (RecordData.FieldGetter fieldGetter : fieldGetters) {
+valueList.add(fieldGetter.getFieldOrNull(recordData));
+}
+
+return tableInfo
+.getRecordDataGenerator()
+.generate(valueList.toArray(new Object[valueList.size()]));
+}
+
 private boolean containFilteredComputedColumn(String projection, String 
filter) {
 boolean contain = false;

Review Comment:
   Due to the filter always executing before the projection, this method is no 
longer useful.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/** Transformation rules used by {@link PostTransformOperator}. */
+public class PostTransformers {
+private final Selectors selectors;
+
+private final Optional projection;
+private final Optional filter;
+
+private final boolean containFilteredComputedColumn;
+

Review Comment:
   Due to the filter always executing before the projection, this private 
variable is no longer useful.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##
@@ -221,73 +240,67 @@ private TableInfo 
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
 return tableInfo;
 }
 
-private void transformSchema(TableId tableId, Schema schema) throws 

Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-07 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2097757324

   Thanks for @aiwenmo's kindly review, addressed comments above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-06 Thread via GitHub


aiwenmo commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1591730125


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##
@@ -143,7 +145,10 @@ public static SqlSelect parseSelect(String statement) {
 
 // Parse all columns
 public static List generateProjectionColumns(
-String projectionExpression, List columns) {
+String projectionExpression,
+@Nullable String filterExpression,
+List columns,
+boolean keepReferencedColumns) {
 if (StringUtils.isNullOrWhitespaceOnly(projectionExpression)) {

Review Comment:
   Splitting the method into generateProjectionColumns and 
generateReferencedColumns is more conducive to maintenance.



##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java:
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import 
org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/** Unit tests for the {@link PreTransformOperator}. */
+public class PreTransformOperatorTest {
+private static final TableId CUSTOMERS_TABLEID =
+TableId.tableId("my_company", "my_branch", "customers");
+private static final Schema CUSTOMERS_SCHEMA =
+Schema.newBuilder()
+.physicalColumn("col1", DataTypes.STRING())
+.physicalColumn("col2", DataTypes.STRING())
+.primaryKey("col1")
+.build();
+private static final Schema CUSTOMERS_LATEST_SCHEMA =
+Schema.newBuilder()
+.physicalColumn("col1", DataTypes.STRING())
+.physicalColumn("col2", DataTypes.STRING())
+.physicalColumn("col3", DataTypes.STRING())
+.primaryKey("col1")
+.build();
+private static final Schema EXPECT_SCHEMA =
+Schema.newBuilder()
+.physicalColumn("col1", DataTypes.STRING())
+.physicalColumn("col2", DataTypes.STRING())
+.physicalColumn("col12", DataTypes.STRING())
+.primaryKey("col2")
+.partitionKey("col12")
+.options(ImmutableMap.of("key1", "value1", "key2", 
"value2"))
+.build();
+private static final Schema EXPECT_LATEST_SCHEMA =
+Schema.newBuilder()
+.physicalColumn("col1", DataTypes.STRING())
+.physicalColumn("col2", DataTypes.STRING())
+.physicalColumn("col12", DataTypes.STRING())
+.physicalColumn("col3", DataTypes.STRING())
+.primaryKey("col2")
+.partitionKey("col12")
+.options(ImmutableMap.of("key1", "value1", "key2", 
"value2"))
+.build();
+
+private static final Schema NULLABILITY_SCHEMA =
+Schema.newBuilder()
+.physicalColumn("id", DataTypes.STRING().notNull())
+.physicalColumn("name", DataTypes.STRING())
+.primaryKey("id")
+  

Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-06 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2095341052

   Updated based on previous comments, cc @aiwenmo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-04-30 Thread via GitHub


yuxiqian opened a new pull request, #3285:
URL: https://github.com/apache/flink-cdc/pull/3285

   This closes [FLINK-35272](https://issues.apache.org/jira/browse/FLINK-35272).
   
   Currently, pipeline jobs with transform (including projection and filtering) 
are constructed with the following topology:
   
   ```
   SchemaTransformOp --> DataTransformOp --> SchemaOp
   ```
   
   where schema projections are applied in `SchemaTransformOp` and data 
projection & filtering are applied in `DataTransformOp`. The idea is 
`SchemaTransformOp` might be embedded in `Sources` in the future to reduce 
payload data size transferred in Flink Job.
   
   However, current implementation has a known defect that omits unused columns 
too early, causing some downstream-relied columns got removed after they 
arrived in `DataTransformOp`. See a example as follows:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
 - source-table: employee
   projection: id, upper(name) as newname
   filter: age > 18
   ```
   
   Such transformation rules will fail since `name` and `age` columns are 
removed in `SchemaTransformOp`, and those data rows could not be retrieved in 
`DataTransformOp`, where the actual expression evaluation and filtering comes 
into effect.
   
   This PR introduces a new design, renaming the transform topology as follows:
   
   ```
   PreTransformOp --> PostTransformOp --> SchemaOp
   ```
   
   where the `PreTransformOp` filters out columns, but only if:
   * The column is not present in projection rules
   * The column is not indirectly referenced by calculation and filtering 
expressions
   
   If a column is explicity written down, it will be passed to downstream 
as-is. But for referenced columns, a special prefix will be added to their 
names. In the example above, a schema like `[id, newname, __PREFIX__name, 
__PREFIX__age]` will be generated to downstream. Notice that the expression 
evaluation and filtering will not come into effect for now, so a 
`DataChangeEvent` would be like `[1, null, 'Alice', 19]`.
   
   Adding prefix is meant to deal with such cases:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
 - source-table: employee
   projection: id, upper(name) as name
   filter: age > 18
   ```
   
   Here we need to distinguish the calculated column `(new) name` and the 
referenced original column `(old) name`. So after the name mangling process the 
schema would be like: `[id, name, __PREFIX__name]`.
   
   Also, the filtering process is still done in `PostTransformOp` since user 
could write down a  filter expression that references calculated column, but 
their value won't be available until `PostTransformOp`'s evaluation. It also 
means in the following somewhat ambigious case:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
 - source-table: employee
   projection: id, age * 2 as age
   filter: age > 18
   ```
   
   The filtering expression is applied to the calculated `age` column 
(doubled!) instead of the original one.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org