Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
herunkang2018 commented on PR #3704: URL: https://github.com/apache/flink-cdc/pull/3704#issuecomment-2509504897 @lvyanquan Thanks for your contribution. I left some comments here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
herunkang2018 commented on code in PR #3704: URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1864688216 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java: ## @@ -125,6 +129,9 @@ public class MongoDBTableFactoryTest { private static final boolean SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT = SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(); +private static final AssignStrategy SCAN_CHUNK_ASSIGN_STRATEGY_DEFAULT = Review Comment: Could we add test in MongoDBConnectorITCase for DESCENDING_ORDER AssignStrategy? This can check the result correctness for the new assign strategy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
herunkang2018 commented on code in PR #3704: URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1864686042 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/MongoDBChunkSplitter.java: ## @@ -38,10 +40,15 @@ public MongoDBChunkSplitter(MongoDBSourceConfig sourceConfig) { @Override public Collection generateSplits(TableId collectionId) { +ArrayList snapshotSplits = new ArrayList<>(); SplitContext splitContext = SplitContext.of(sourceConfig, collectionId); if (splitContext.isShardedCollection()) { -return ShardedSplitStrategy.INSTANCE.split(splitContext); + snapshotSplits.addAll(ShardedSplitStrategy.INSTANCE.split(splitContext)); } -return SplitVectorSplitStrategy.INSTANCE.split(splitContext); + snapshotSplits.addAll(SplitVectorSplitStrategy.INSTANCE.split(splitContext)); Review Comment: Seems it will split repeatly here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
herunkang2018 commented on code in PR #3704: URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1864686969 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/AssignStrategy.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters; + +import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; + +/** Strategy for {@link MongoDBChunkSplitter} to assign {@link SnapshotSplit}. */ +public enum AssignStrategy { Review Comment: Could we move this class to flink-cdc-base module, and reuse it in mysql/mongo source module? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
lvyanquan commented on code in PR #3704: URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1837379995 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java: ## @@ -157,4 +158,27 @@ public class MongoDBSourceOptions { .defaultValue(true) .withDescription( "MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that."); + +public static final ConfigOption SCAN_FLATTEN_NESTED_COLUMNS_ENABLED = +ConfigOptions.key("scan.flatten-nested-columns.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"Optional flag to recursively flatten the Bson field into columns." ++ "For a better understanding, the name of the flattened column will be composed of the path to get the column. " ++ "For example, the field `col` in the Bson document {\"nested\": {\"col\": true}} is `nested.col` in the flattened schema. "); + +public static final ConfigOption SCAN_PRIMITIVE_AS_STRING = +ConfigOptions.key("scan.primitive-as-string") +.booleanType() +.defaultValue(false) +.withDescription("Optional flag to infer primitive types as string type."); Review Comment: removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
yuxiqian commented on code in PR #3704: URL: https://github.com/apache/flink-cdc/pull/3704#discussion_r1835436798 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java: ## @@ -157,4 +158,27 @@ public class MongoDBSourceOptions { .defaultValue(true) .withDescription( "MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that."); + +public static final ConfigOption SCAN_FLATTEN_NESTED_COLUMNS_ENABLED = +ConfigOptions.key("scan.flatten-nested-columns.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"Optional flag to recursively flatten the Bson field into columns." ++ "For a better understanding, the name of the flattened column will be composed of the path to get the column. " ++ "For example, the field `col` in the Bson document {\"nested\": {\"col\": true}} is `nested.col` in the flattened schema. "); + +public static final ConfigOption SCAN_PRIMITIVE_AS_STRING = +ConfigOptions.key("scan.primitive-as-string") +.booleanType() +.defaultValue(false) +.withDescription("Optional flag to infer primitive types as string type."); Review Comment: Irrelevant change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36682][mysql][mongodb] Add scan.chunk.assign.strategy to avoid OOM exception. [flink-cdc]
lvyanquan opened a new pull request, #3704: URL: https://github.com/apache/flink-cdc/pull/3704 Introduce a new config option scan.chunk.assign.strategy to control the order of split assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org