Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-30 Thread via GitHub


herunkang2018 commented on PR #3704:
URL: https://github.com/apache/flink-cdc/pull/3704#issuecomment-2509504897

   @lvyanquan Thanks for your contribution. I left some comments here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-30 Thread via GitHub


herunkang2018 commented on code in PR #3704:
URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1864688216


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java:
##
@@ -125,6 +129,9 @@ public class MongoDBTableFactoryTest {
 private static final boolean SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT =
 SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
 
+private static final AssignStrategy SCAN_CHUNK_ASSIGN_STRATEGY_DEFAULT =

Review Comment:
   Could we add test in MongoDBConnectorITCase for DESCENDING_ORDER 
AssignStrategy? This can check the result correctness for the new assign 
strategy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-30 Thread via GitHub


herunkang2018 commented on code in PR #3704:
URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1864686042


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/MongoDBChunkSplitter.java:
##
@@ -38,10 +40,15 @@ public MongoDBChunkSplitter(MongoDBSourceConfig 
sourceConfig) {
 
 @Override
 public Collection generateSplits(TableId collectionId) {
+ArrayList snapshotSplits = new ArrayList<>();
 SplitContext splitContext = SplitContext.of(sourceConfig, 
collectionId);
 if (splitContext.isShardedCollection()) {
-return ShardedSplitStrategy.INSTANCE.split(splitContext);
+
snapshotSplits.addAll(ShardedSplitStrategy.INSTANCE.split(splitContext));
 }
-return SplitVectorSplitStrategy.INSTANCE.split(splitContext);
+
snapshotSplits.addAll(SplitVectorSplitStrategy.INSTANCE.split(splitContext));

Review Comment:
   Seems it will split repeatly here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-30 Thread via GitHub


herunkang2018 commented on code in PR #3704:
URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1864686969


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/AssignStrategy.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters;
+
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+
+/** Strategy for {@link MongoDBChunkSplitter} to assign {@link SnapshotSplit}. 
*/
+public enum AssignStrategy {

Review Comment:
   Could we move this class to flink-cdc-base module, and reuse it in 
mysql/mongo source module?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-11 Thread via GitHub


lvyanquan commented on code in PR #3704:
URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1837379995


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java:
##
@@ -157,4 +158,27 @@ public class MongoDBSourceOptions {
 .defaultValue(true)
 .withDescription(
 "MongoDB server normally times out idle cursors 
after an inactivity period (10 minutes) to prevent excess memory use. Set this 
option to true to prevent that.");
+
+public static final ConfigOption 
SCAN_FLATTEN_NESTED_COLUMNS_ENABLED =
+ConfigOptions.key("scan.flatten-nested-columns.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Optional flag to recursively flatten the Bson 
field into columns."
++ "For a better understanding, the name of 
the flattened column will be composed of the path to get the column. "
++ "For example, the field `col` in the 
Bson document {\"nested\": {\"col\": true}} is `nested.col` in the flattened 
schema. ");
+
+public static final ConfigOption SCAN_PRIMITIVE_AS_STRING =
+ConfigOptions.key("scan.primitive-as-string")
+.booleanType()
+.defaultValue(false)
+.withDescription("Optional flag to infer primitive types 
as string type.");

Review Comment:
   removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-09 Thread via GitHub


yuxiqian commented on code in PR #3704:
URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1835436798


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java:
##
@@ -157,4 +158,27 @@ public class MongoDBSourceOptions {
 .defaultValue(true)
 .withDescription(
 "MongoDB server normally times out idle cursors 
after an inactivity period (10 minutes) to prevent excess memory use. Set this 
option to true to prevent that.");
+
+public static final ConfigOption 
SCAN_FLATTEN_NESTED_COLUMNS_ENABLED =
+ConfigOptions.key("scan.flatten-nested-columns.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Optional flag to recursively flatten the Bson 
field into columns."
++ "For a better understanding, the name of 
the flattened column will be composed of the path to get the column. "
++ "For example, the field `col` in the 
Bson document {\"nested\": {\"col\": true}} is `nested.col` in the flattened 
schema. ");
+
+public static final ConfigOption SCAN_PRIMITIVE_AS_STRING =
+ConfigOptions.key("scan.primitive-as-string")
+.booleanType()
+.defaultValue(false)
+.withDescription("Optional flag to infer primitive types 
as string type.");

Review Comment:
   Irrelevant change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]

2024-11-08 Thread via GitHub


lvyanquan opened a new pull request, #3704:
URL: https://github.com/apache/flink-cdc/pull/3704

   Introduce a new config option scan.chunk.assign.strategy to control the 
order of split assignment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org