Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-06-04 Thread via GitHub


PatrickRen commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2146945078

   @loserwang1024 Could you cherry-pick this commit to release-3.1 branch? 
Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-06-04 Thread via GitHub


PatrickRen merged PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-29 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2138528049

   > Hi @loserwang1024, could you please backport this patch to `release-3.1` 
branch so that it could be released with CDC 3.1.1?
   
   I'd like to, but it haven't been merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2136650327

   Hi @loserwang1024, could you please backport this patch to `release-3.1` 
branch so that it could be released with CDC 3.1.1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-13 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2106754276

   @PatrickRen , CC, Would you like to help me review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


loserwang1024 commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1594884935


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java:
##
@@ -71,7 +72,9 @@ public void translate(
 }
 }
 
-private void sinkTo(
+/** Only visible for test */
+@VisibleForTesting
+protected void sinkTo(

Review Comment:
   Thanks for your advice, it seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


loserwang1024 commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1594878296


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** A test for {@link DataSinkTranslator} */
+public class DataSinkTranslatorTest {
+
+@Test
+public void testPreWriteWithoutCommitSink() {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new 
EmptyEvent());
+DataStreamSource inputStream = env.fromCollection(mockEvents);
+DataSinkTranslator translator = new DataSinkTranslator();
+
+String uid = "";

Review Comment:
   I've tried it before, but it shows that : 
   java.lang.IllegalArgumentException: Node hash must be a 32 character String 
that describes a hex code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


pvary commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593954989


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** A test for {@link DataSinkTranslator} */
+public class DataSinkTranslatorTest {
+
+@Test
+public void testPreWriteWithoutCommitSink() {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new 
EmptyEvent());
+DataStreamSource inputStream = env.fromCollection(mockEvents);
+DataSinkTranslator translator = new DataSinkTranslator();
+
+String uid = "";
+MockPreWriteWithoutCommitSink mockPreWriteWithoutCommitSink =
+new MockPreWriteWithoutCommitSink(uid);
+translator.sinkTo(
+inputStream,
+mockPreWriteWithoutCommitSink,
+"testPreWriteWithoutCommitSink",
+new OperatorID());
+OneInputTransformation oneInputTransformation =
+(OneInputTransformation) env.getTransformations().get(0);
+Transformation reblanceTransformation = 
oneInputTransformation.getInputs().get(0);
+Assert.assertEquals(uid, 
reblanceTransformation.getUserProvidedNodeHash());

Review Comment:
   Maybe a comment, like:
   ```
   // Check if the `addPreWriteTopology` is called, and the uid is set when the 
transformation added
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


pvary commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593952991


##
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.composer.flink.translator;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** A test for {@link DataSinkTranslator} */
+public class DataSinkTranslatorTest {
+
+@Test
+public void testPreWriteWithoutCommitSink() {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ArrayList mockEvents = Lists.newArrayList(new EmptyEvent(), new 
EmptyEvent());
+DataStreamSource inputStream = env.fromCollection(mockEvents);
+DataSinkTranslator translator = new DataSinkTranslator();
+
+String uid = "";

Review Comment:
   nit: Maybe some more descriptive content, like
   
   ```
   String uid = "Uid set by the addPreWriteTopology topology";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


pvary commented on code in PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#discussion_r1593951427


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java:
##
@@ -71,7 +72,9 @@ public void translate(
 }
 }
 
-private void sinkTo(
+/** Only visible for test */
+@VisibleForTesting
+protected void sinkTo(

Review Comment:
   nit: The comment says the same, as the annotation, so it is not needed.
   We can leave this as package private (slightly lower privileges than 
`protected`.
   
   ```
   @VisibleForTesting
   void sinkTo(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-08 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2099986993

   > Could you define your own one in the test itself? Then you have free hands 
what it does, and does not...
   
   Done it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-02 Thread via GitHub


pvary commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2090855121

   > > Could you please add a test case to prevent later code changes to revert 
this fix?
   > 
   > I'd like to, but it seems no pipeline sink which is WithPreWriteTopology 
but not TwoPhaseCommittingSink now unless i mock one.
   
   Could you define your own one in the test itself?
   Then you have free hands what it does, and does not...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-02 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2089709636

   > Could you please add a test case to prevent later code changes to revert 
this fix?
   
   I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but 
not TwoPhaseCommittingSink now unless i mock one.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-04-18 Thread via GitHub


pvary commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2065143602

   Good catch @loserwang1024!
   
   Could you please add a test case to prevent later code changes to revert 
this fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-04-17 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2062883633

   @PatrickRen , CC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org