[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r514017050 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +111,63 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +assert(runtimeProvider.isInstanceOf[ParallelismProvider], + "runtimeProvider with `ParallelismProvider` implementation is required") + +val inputParallelism = inputTransformation.getParallelism +val parallelism = { + val parallelismOptional = runtimeProvider +.asInstanceOf[ParallelismProvider].getParallelism + if(parallelismOptional.isPresent) { +val parallelismPassedIn = parallelismOptional.get().intValue() +if(parallelismPassedIn <= 0) { + throw new TableException( +s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " + + "should not be less than zero or equal to zero") +} +parallelismPassedIn + } else inputParallelism +} + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val theFinalInputTransformation = + (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match { + // if the inputParallelism equals parallelism, do nothing. + case (true, _, _) => inputTransformation + case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation + case (_, _, Nil) => +throw new TableException( +s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " + +s"while the input parallelism is: $inputParallelism. " + +s"Since the changelog mode " + +s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " + +s"which is not INSERT_ONLY mode, " + +s"primary key is required but no primary key is found" + ) + case (_, _, pks) => +//key by before sink +//according to [[StreamExecExchange]] +val selector = KeySelectorUtil.getRowDataSelector( + pks.toArray, inputTypeInfo) +// in case of maxParallelism is negative +val keyGroupNum = env.getMaxParallelism match { Review comment: Ok, just use DEFAULT_LOWER_BOUND_MAX_PARALLELISM? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513169070 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. Review comment: @JingsongLi I only do `keyBy` on ChangelogMode which contains `update_after`, while on other changelogMode I just keep the transformation with do nothing upon it. Is that Proper or not? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi it is recommended to use the Optional only in method return values copy that. I think we don't need provide method. Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi **it is recommended to use the Optional only in method return values ** copy that. ** I think we don't need provide method.** Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi **it is recommended to use the Optional only in method return values ** copy that. ** I think we don't need provide method.** Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi **it is recommended to use the Optional only in method return values ** copy that. ** I think we don't need provide method.** Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513167212 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java ## @@ -20,19 +20,39 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import java.util.Optional; + /** * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}. */ @PublicEvolving -public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider { +public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { /** * Helper method for creating a static provider. */ static SinkFunctionProvider of(SinkFunction sinkFunction) { - return () -> sinkFunction; + return of(sinkFunction, Optional.empty()); + } + + /** +* Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in. +*/ + static SinkFunctionProvider of(SinkFunction sinkFunction, Optional parallelism) { Review comment: @JingsongLi > it is recommended to use the Optional only in method return values copy that. > I think we don't need provide method. Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513165608 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism Review comment: Since we don‘t need to do this according to the conversation below here, it is fine to remove this line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513165407 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism Review comment: Well, correct me if i am wrong, `getMaxParallelism` is the upper bound of parallelism that the task can apply. `getParallelism ` is the actual parallelism that the task applies This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513164185 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() Review comment: My concern is that once a NEW provider without `parallelismProvider` is used, exception will be thrown unless we do not check the type This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r513163662 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") Review comment: Sure, btw should `TableException` also be thrown here instead of `RuntimeException`? @JingsongLi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r511716192 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. +case (_, _) if(containedRowKinds == Set(RowKind.DELETE)) => inputTransformation +case (_, _) if(containedRowKinds == Set(RowKind.INSERT)) => inputTransformation +// fixme: for retract mode (insert and delete contains only), is there somethinng to do with? Currently do nothing. +case (_, _) if(containedRowKinds == Set(RowKind.INSERT,RowKind.DELETE)) => inputTransformation +case (_, Nil) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => throw new RuntimeException(s"ChangelogMode contains ${RowKind.UPDATE_AFTER}, but no primaryKeys were found") +case (_, _) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => new DataStream[RowData](env,inputTransformation).keyBy(primaryKeys:_*).getTransformation +case _ => throw new RuntimeException(s"the changelogMode is: ${containedRowKinds.mkString(",")}, which is not supported") + } + Review comment: I enumerated all the changelog mode conditions by using Scala match pattern. So that we can esaily modify every condition in its own scope. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r511713970 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. Review comment: Actually, this part should be under well discussed. For every changelog mode, from my perspective, should be treated differently. Now I only figured out what should be done in `UPSERT MODE` or `INSERT_ONLY` mode. What shall we do for other changelog mode? WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r511713970 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. Review comment: Actually, this part should be under well discussed. For every changelog mode, from my perspective, should be treated differently. Now I only figured out what should be done in `UPSERT MODE` or `INSERT_ONLY` mode. What shall we do for other changelog modes? WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…
shouweikun commented on a change in pull request #13789: URL: https://github.com/apache/flink/pull/13789#discussion_r511713970 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala ## @@ -99,11 +105,33 @@ class CommonPhysicalSink ( val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer) +val inputParallelism = inputTransformation.getParallelism +val taskParallelism = env.getParallelism +val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue() +else inputParallelism + +if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero") +if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism") + +val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema) +val containedRowKinds = changelogMode.getContainedKinds.toSet +val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing +else (containedRowKinds, primaryKeys.toList) match { +// fixme : if rowKinds only contains delete, is there somethinng to do with? Currently do nothing. Review comment: Actually, this part should be under well discussed. For every changelog mode, from my perspective, should be treated differently. Now I only figured out what should be done in "UPSERT MODE" or `INSERT_ONLY` mode. What shall we do for other changelog? WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org