Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-05-22 Thread via GitHub


hk-lrzy commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1609878354


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,11 +142,23 @@ public Schema load(TableId tableId) {
 });
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {

Review Comment:
   why only restored from checkpoint need refresh request?
   does there have follow case:
   
   1. job start
   2. schema operator received some schema change events and send it to 
coordinator.
   3. operator failed before schema operator get response and there have no 
checkpoint.
   4. schema operator restart and send new `CreateTableEvent` to the 
coordinator.
   5. coordinator have some pending event and cause new `CreateTableEvent` will 
in the pending list, and schema operator will blocking forever.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-05-22 Thread via GitHub


hk-lrzy commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1609878354


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,11 +142,23 @@ public Schema load(TableId tableId) {
 });
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {

Review Comment:
   why only restore need refresh.
   does there have follow case:
   
   1. job start
   2. schema operator received some schema change events and send it to 
coordinator.
   3. operator failed before schema operator get response and there have no 
checkpoint.
   4. schema operator restart and send new `CreateTableEvent` to the 
coordinator.
   5. coordinator have some pending event and cause new `CreateTableEvent` will 
in the pending list, and schema operator will blocking forever.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-05-22 Thread via GitHub


hk-lrzy commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1609878354


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,11 +142,23 @@ public Schema load(TableId tableId) {
 });
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {

Review Comment:
   why only restore need refresh.
   does there have follow case:
   
   1. job start
   2. schema operator received some schema change events and send it to 
coordinator.
   3. operator failed before schema operator get response and there have no 
checkpoint.
   4. schema operator restart and send new `CreateTableEvent` to the 
coordinator.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-24 Thread via GitHub


leonardBang merged PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-23 Thread via GitHub


lvyanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2072115980

   > There is an error in the scenario of using route from CI, and I am trying 
to fix it.
Done fix.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-23 Thread via GitHub


lvyanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2072051290

   There is an error in the scenario of using route from CI, and I am trying to 
fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-22 Thread via GitHub


lvyanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2071458830

   @leonardBang Can you help to start a CI workflow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-17 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1568374817


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -85,5 +89,12 @@ public class PipelineOptions {
 .withDescription(
 "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+public static final ConfigOption 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+ConfigOptions.key("schema-operator.rpc-timeout")
+.durationType()
+
.defaultValue(Duration.ofSeconds(SCHEMA_OPERATOR_RPC_TIMEOUT_SECOND_DEFAULT))
+.withDescription(
+"The timeout time for SchemaOperator to wait for 
schema change. the default value is 3 min.");

Review Comment:
   Addressed it and rebased to master.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-16 Thread via GitHub


leonardBang commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1567483688


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -85,5 +89,12 @@ public class PipelineOptions {
 .withDescription(
 "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+public static final ConfigOption 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+ConfigOptions.key("schema-operator.rpc-timeout")
+.durationType()
+
.defaultValue(Duration.ofSeconds(SCHEMA_OPERATOR_RPC_TIMEOUT_SECOND_DEFAULT))
+.withDescription(
+"The timeout time for SchemaOperator to wait for 
schema change. the default value is 3 min.");

Review Comment:
   ```suggestion
   "The timeout time for SchemaOperator to wait 
downstream SchemaChangeEvent applying finished, the he default value is 3 
minutes.");
   ```



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -23,12 +23,16 @@
 import org.apache.flink.cdc.common.configuration.description.Description;
 import org.apache.flink.cdc.common.configuration.description.ListElement;
 
+import java.time.Duration;
+
 import static 
org.apache.flink.cdc.common.configuration.description.TextElement.text;
 
 /** Predefined pipeline configuration options. */
 @PublicEvolving
 public class PipelineOptions {
 
+public static final Integer SCHEMA_OPERATOR_RPC_TIMEOUT_SECOND_DEFAULT = 3 
* 60;

Review Comment:
   ```suggestion
   public static final Duration DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT = 
Duration.ofMinutes(3);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554820774


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -85,5 +89,12 @@ public class PipelineOptions {
 .withDescription(
 "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+public static final ConfigOption 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+ConfigOptions.key("schema-operator-rpc-timeout")

Review Comment:
   Addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


leonardBang commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554795627


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -85,5 +89,12 @@ public class PipelineOptions {
 .withDescription(
 "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+public static final ConfigOption 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+ConfigOptions.key("schema-operator-rpc-timeout")

Review Comment:
   ```suggestion
   ConfigOptions.key("schema-operator.rpc-timeout")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


leonardBang commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554795438


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,4 +136,13 @@ RESPONSE sendRequestToCoordinator(REQUEST request) {
 "Failed to send request to coordinator: " + 
request.toString(), e);
 }
 }
+
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {
+if (getRuntimeContext().getIndexOfThisSubtask() == 0) {

Review Comment:
   Got it, it will better to add the explanation as code note



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2041034979

   > Thanks @lvyanquan for the fix, I left some comments. And I think tests is 
necessary, e.g. a test mocked RPC timeout when processing schema change event 
is welcome.
   
   Add new tests in 
https://github.com/apache/flink-cdc/pull/3128/files#diff-c3455d69be8d51aab2230d40c29944f6d451116539098421169a38a7469ed3c7R114.
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554559194


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,4 +136,13 @@ RESPONSE sendRequestToCoordinator(REQUEST request) {
 "Failed to send request to coordinator: " + 
request.toString(), e);
 }
 }
+
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {
+if (getRuntimeContext().getIndexOfThisSubtask() == 0) {

Review Comment:
   Because there may be multiple subtasks for the SchemaOperator, but we just 
want to clear the information in the SchemaRegistryHandler once during restart.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554559016


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -111,8 +115,13 @@ private SchemaChangeResponse requestSchemaChange(
 return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
 }
 
-private ReleaseUpstreamResponse requestReleaseUpstream() {
-return sendRequestToCoordinator(new ReleaseUpstreamRequest());
+private void requestReleaseUpstream() throws InterruptedException {
+CoordinationResponse coordinationResponse =
+sendRequestToCoordinator(new ReleaseUpstreamRequest());
+while (coordinationResponse instanceof SchemaChangeProcessingResponse) 
{

Review Comment:
   Done, add a new pipeline config 
https://github.com/apache/flink-cdc/pull/3128/files#diff-efc0e20a0c40e81813d3be00eadab7bc066598e521b24a85f2c3e5c9b83c2b4bR92
 to control this.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.event;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+/** request for get change result. */

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-03 Thread via GitHub


leonardBang commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1549367497


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -111,8 +115,13 @@ private SchemaChangeResponse requestSchemaChange(
 return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
 }
 
-private ReleaseUpstreamResponse requestReleaseUpstream() {
-return sendRequestToCoordinator(new ReleaseUpstreamRequest());
+private void requestReleaseUpstream() throws InterruptedException {
+CoordinationResponse coordinationResponse =
+sendRequestToCoordinator(new ReleaseUpstreamRequest());
+while (coordinationResponse instanceof SchemaChangeProcessingResponse) 
{

Review Comment:
   Continuous loop maybe not acceptable as this may lead endless coordination. 
Could we introduce a timeout config option with a reasonable default value for 
coordination ?



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.event;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+/** request for get change result. */

Review Comment:
   Could you improve your java doc refer other classes?



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,4 +136,13 @@ RESPONSE sendRequestToCoordinator(REQUEST request) {
 "Failed to send request to coordinator: " + 
request.toString(), e);
 }
 }
+
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {
+if (getRuntimeContext().getIndexOfThisSubtask() == 0) {

Review Comment:
   I didn't catch up this limitation, could you explain this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-18 Thread via GitHub


LYanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2003238562

   > Should we clear the flushedSinkWriters as well?
   
   Thanks for this suggestion, address it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-18 Thread via GitHub


LYanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2003214710

   @fanqiejiang8 Hi, I have added `getRuntimeContext().getIndexOfThisSubtask() 
== 0` check to make sure that it won't send many times.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-18 Thread via GitHub


BaoPiao commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2003029727

   > @BaoPiao Yeah, this is because SchemaRegistryRequestHandler doesn't clear 
pendingSchemaChanges when SchemaOperator restarted, I've added one commit 
[7aba6a4](https://github.com/apache/flink-cdc/commit/7aba6a4489486af9fece954f561dca0c9c443006)
 to fixed it.
   
   Should we clear the flushedSinkWriters as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-18 Thread via GitHub


yanghuaiGit commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2003023705

   
如果schemaRegister的pending里有多个待执行schema事件,isSchemaChangeApplying代表的是正在执行schema变更事件,但是无法得知是哪个schema事件执行,isSchemaChangeApplying无法和客户端的事件绑定,导致schemaoperator可能需要等待所有的schema事件执行完才会转为非阻塞


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-17 Thread via GitHub


fanqiejiang8 commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2002855730

   @lvyanquan @BaoPiao When task parallelism is multiple, task retry all sent 
SchemaRegistry task more the RefreshPendingListsRequest request, that each 
request will not cause data inconsistency pendingSchemaChanges
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-17 Thread via GitHub


LYanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2002529227

   @BaoPiao Yeah, this is because SchemaRegistryRequestHandler doesn't clear 
pendingSchemaChanges when SchemaOperator restarted, I've added one commit 
https://github.com/apache/flink-cdc/pull/3128/commits/7aba6a4489486af9fece954f561dca0c9c443006
 to fixed it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-14 Thread via GitHub


BaoPiao commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-1996731322

   If the SchemaRegistryRequestHandler is in the midst of executing a scheme 
change, and the task manager where the SchemaOperator resides undergoes a 
restart, could this potentially result in the task being stuck indefinitely?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-14 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1524312973


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##
@@ -70,6 +81,8 @@ public SchemaRegistryRequestHandler(
 this.flushedSinkWriters = new HashSet<>();
 this.pendingSchemaChanges = new LinkedList<>();
 this.schemaManager = schemaManager;
+schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+isSchemaChangeApplying = true;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-03-13 Thread via GitHub


ruanhang1993 commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1524276055


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##
@@ -70,6 +81,8 @@ public SchemaRegistryRequestHandler(
 this.flushedSinkWriters = new HashSet<>();
 this.pendingSchemaChanges = new LinkedList<>();
 this.schemaManager = schemaManager;
+schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+isSchemaChangeApplying = true;

Review Comment:
   isSchemaChangeApplying = false;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org