[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-29 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514017050



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+  "runtimeProvider with `ParallelismProvider` implementation is 
required")
+
+val inputParallelism = inputTransformation.getParallelism
+val parallelism =  {
+  val parallelismOptional = runtimeProvider
+.asInstanceOf[ParallelismProvider].getParallelism
+  if(parallelismOptional.isPresent) {
+val parallelismPassedIn = parallelismOptional.get().intValue()
+if(parallelismPassedIn <= 0) {
+  throw new TableException(
+s"Table: $tableIdentifier configured sink parallelism: 
$parallelismPassedIn " +
+  "should not be less than zero or equal to zero")
+}
+parallelismPassedIn
+  } else inputParallelism
+}
+
+val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+val theFinalInputTransformation =
+  (inputParallelism == parallelism,changelogMode, primaryKeys.toList) 
match {
+   // if the inputParallelism equals parallelism, do nothing.
+  case (true, _, _) => inputTransformation
+  case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => 
inputTransformation
+  case (_, _, Nil) =>
+throw new TableException(
+s"Table: $tableIdentifier configured sink parallelism is: 
$parallelism, " +
+s"while the input parallelism is: $inputParallelism. " +
+s"Since the changelog mode " +
+s"contains 
[${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+s"which is not INSERT_ONLY mode, " +
+s"primary key is required but no primary key is found"
+  )
+  case (_, _, pks) =>
+//key by before sink
+//according to [[StreamExecExchange]]
+val selector = KeySelectorUtil.getRowDataSelector(
+  pks.toArray, inputTypeInfo)
+// in case of maxParallelism is negative
+val keyGroupNum = env.getMaxParallelism match {

Review comment:
   Ok, just use DEFAULT_LOWER_BOUND_MAX_PARALLELISM?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513169070



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+else inputParallelism
+
+if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new 
RuntimeException(s"the configured sink parallelism: $parallelism should not be 
less than zero or equal to zero")
+if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw 
new RuntimeException(s"the configured sink parallelism: $parallelism is larger 
than the task max parallelism: $taskParallelism")
+
+val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+val containedRowKinds = changelogMode.getContainedKinds.toSet
+val theFinalInputTransformation = if(inputParallelism == parallelism) 
inputTransformation //if the parallelism is not changed, do nothing
+else (containedRowKinds, primaryKeys.toList) match {
+// fixme : if rowKinds only contains  delete, is there somethinng to 
do with? Currently do nothing.

Review comment:
   @JingsongLi  
   
   I only do `keyBy` on ChangelogMode which contains `update_after`, while on 
other changelogMode I just keep the transformation with do nothing upon it. Is 
that Proper or not?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for 
{@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
/**
 * Helper method for creating a static provider.
 */
static SinkFunctionProvider of(SinkFunction sinkFunction) {
-   return () -> sinkFunction;
+   return of(sinkFunction, Optional.empty());
+   }
+
+   /**
+* Helper method for creating a static provider, sink parallelism will 
be configured if non-empty parallelism is passed in.
+*/
+   static SinkFunctionProvider of(SinkFunction sinkFunction, 
Optional parallelism) {

Review comment:
   @JingsongLi 
   it is recommended to use the Optional only in method return values 
   
   copy that.
   
I think we don't need provide method.
   
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as 
default, as far as I’m concerned, there should be a method passing the 
parallelism in. Or Is there an better alternative? It‘s kind of u to tell me 
that~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for 
{@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
/**
 * Helper method for creating a static provider.
 */
static SinkFunctionProvider of(SinkFunction sinkFunction) {
-   return () -> sinkFunction;
+   return of(sinkFunction, Optional.empty());
+   }
+
+   /**
+* Helper method for creating a static provider, sink parallelism will 
be configured if non-empty parallelism is passed in.
+*/
+   static SinkFunctionProvider of(SinkFunction sinkFunction, 
Optional parallelism) {

Review comment:
   @JingsongLi 
   **it is recommended to use the Optional only in method return values **
   copy that.
   ** I think we don't need provide method.**
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as 
default, as far as I’m concerned, there should be a method passing the 
parallelism in. Or Is there an better alternative? It‘s kind of u to tell me 
that~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for 
{@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
/**
 * Helper method for creating a static provider.
 */
static SinkFunctionProvider of(SinkFunction sinkFunction) {
-   return () -> sinkFunction;
+   return of(sinkFunction, Optional.empty());
+   }
+
+   /**
+* Helper method for creating a static provider, sink parallelism will 
be configured if non-empty parallelism is passed in.
+*/
+   static SinkFunctionProvider of(SinkFunction sinkFunction, 
Optional parallelism) {

Review comment:
   @JingsongLi 
   **it is recommended to use the Optional only in method return values **
   
   copy that.
   
   ** I think we don't need provide method.**
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as 
default, as far as I’m concerned, there should be a method passing the 
parallelism in. Or Is there an better alternative? It‘s kind of u to tell me 
that~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for 
{@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
/**
 * Helper method for creating a static provider.
 */
static SinkFunctionProvider of(SinkFunction sinkFunction) {
-   return () -> sinkFunction;
+   return of(sinkFunction, Optional.empty());
+   }
+
+   /**
+* Helper method for creating a static provider, sink parallelism will 
be configured if non-empty parallelism is passed in.
+*/
+   static SinkFunctionProvider of(SinkFunction sinkFunction, 
Optional parallelism) {

Review comment:
   @JingsongLi 
   **it is recommended to use the Optional only in method return values **
   
   copy that.
   ** I think we don't need provide method.**
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as 
default, as far as I’m concerned, there should be a method passing the 
parallelism in. Or Is there an better alternative? It‘s kind of u to tell me 
that~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for 
{@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends 
DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
/**
 * Helper method for creating a static provider.
 */
static SinkFunctionProvider of(SinkFunction sinkFunction) {
-   return () -> sinkFunction;
+   return of(sinkFunction, Optional.empty());
+   }
+
+   /**
+* Helper method for creating a static provider, sink parallelism will 
be configured if non-empty parallelism is passed in.
+*/
+   static SinkFunctionProvider of(SinkFunction sinkFunction, 
Optional parallelism) {

Review comment:
   @JingsongLi 
   > it is recommended to use the Optional only in method return values 
   copy that.
   > I think we don't need provide method.
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as 
default, as far as I’m concerned, there should be a method passing the 
parallelism in. Or Is there an better alternative? It‘s kind of u to tell me 
that~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513165608



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism

Review comment:
   Since we don‘t need to do this according to the conversation below here, 
it is fine to remove this line.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513165407



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism

Review comment:
   Well,  correct me if i am wrong, `getMaxParallelism` is the upper bound 
of parallelism that the task can apply.
   `getParallelism ` is the actual parallelism that the task applies





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513164185



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()

Review comment:
   My concern is that once a NEW provider without `parallelismProvider` is 
used, exception will be thrown unless we do not check the type 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-27 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513163662



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+else inputParallelism
+
+if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new 
RuntimeException(s"the configured sink parallelism: $parallelism should not be 
less than zero or equal to zero")

Review comment:
   Sure, btw should `TableException` also be thrown here instead of 
`RuntimeException`?
   @JingsongLi 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-25 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511716192



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+else inputParallelism
+
+if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new 
RuntimeException(s"the configured sink parallelism: $parallelism should not be 
less than zero or equal to zero")
+if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw 
new RuntimeException(s"the configured sink parallelism: $parallelism is larger 
than the task max parallelism: $taskParallelism")
+
+val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+val containedRowKinds = changelogMode.getContainedKinds.toSet
+val theFinalInputTransformation = if(inputParallelism == parallelism) 
inputTransformation //if the parallelism is not changed, do nothing
+else (containedRowKinds, primaryKeys.toList) match {
+// fixme : if rowKinds only contains  delete, is there somethinng to 
do with? Currently do nothing.
+case (_, _) if(containedRowKinds == Set(RowKind.DELETE)) => 
inputTransformation
+case (_, _) if(containedRowKinds == Set(RowKind.INSERT)) => 
inputTransformation
+// fixme: for retract mode (insert and delete contains only), is there 
somethinng to do with? Currently do nothing.
+case (_, _) if(containedRowKinds == 
Set(RowKind.INSERT,RowKind.DELETE)) => inputTransformation
+case (_, Nil) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => 
throw new RuntimeException(s"ChangelogMode contains ${RowKind.UPDATE_AFTER}, 
but no primaryKeys were found")
+case (_, _) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => 
new 
DataStream[RowData](env,inputTransformation).keyBy(primaryKeys:_*).getTransformation
+case _ => throw new RuntimeException(s"the changelogMode is: 
${containedRowKinds.mkString(",")}, which is not supported")
+  }
+

Review comment:
   I enumerated all the changelog mode conditions by using Scala match 
pattern.
   So that we can esaily modify every condition in its own scope.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-25 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511713970



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+else inputParallelism
+
+if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new 
RuntimeException(s"the configured sink parallelism: $parallelism should not be 
less than zero or equal to zero")
+if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw 
new RuntimeException(s"the configured sink parallelism: $parallelism is larger 
than the task max parallelism: $taskParallelism")
+
+val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+val containedRowKinds = changelogMode.getContainedKinds.toSet
+val theFinalInputTransformation = if(inputParallelism == parallelism) 
inputTransformation //if the parallelism is not changed, do nothing
+else (containedRowKinds, primaryKeys.toList) match {
+// fixme : if rowKinds only contains  delete, is there somethinng to 
do with? Currently do nothing.

Review comment:
   Actually, this part should be under well discussed. 
   For every changelog mode, from my perspective, should be treated 
differently. Now I only figured out what should be done in `UPSERT MODE` or 
`INSERT_ONLY` mode.
   
   What shall we do for other changelog mode?
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-25 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511713970



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+else inputParallelism
+
+if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new 
RuntimeException(s"the configured sink parallelism: $parallelism should not be 
less than zero or equal to zero")
+if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw 
new RuntimeException(s"the configured sink parallelism: $parallelism is larger 
than the task max parallelism: $taskParallelism")
+
+val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+val containedRowKinds = changelogMode.getContainedKinds.toSet
+val theFinalInputTransformation = if(inputParallelism == parallelism) 
inputTransformation //if the parallelism is not changed, do nothing
+else (containedRowKinds, primaryKeys.toList) match {
+// fixme : if rowKinds only contains  delete, is there somethinng to 
do with? Currently do nothing.

Review comment:
   Actually, this part should be under well discussed. 
   For every changelog mode, from my perspective, should be treated 
differently. Now I only figured out what should be done in `UPSERT MODE` or 
`INSERT_ONLY` mode.
   
   What shall we do for other changelog modes?
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

2020-10-25 Thread GitBox


shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511713970



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
 val operator = new SinkOperator(env.clean(sinkFunction), 
rowtimeFieldIndex, enforcer)
 
+val inputParallelism = inputTransformation.getParallelism
+val taskParallelism = env.getParallelism
+val parallelism = if 
(runtimeProvider.isInstanceOf[ParallelismProvider]) 
runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+else inputParallelism
+
+if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new 
RuntimeException(s"the configured sink parallelism: $parallelism should not be 
less than zero or equal to zero")
+if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw 
new RuntimeException(s"the configured sink parallelism: $parallelism is larger 
than the task max parallelism: $taskParallelism")
+
+val primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+val containedRowKinds = changelogMode.getContainedKinds.toSet
+val theFinalInputTransformation = if(inputParallelism == parallelism) 
inputTransformation //if the parallelism is not changed, do nothing
+else (containedRowKinds, primaryKeys.toList) match {
+// fixme : if rowKinds only contains  delete, is there somethinng to 
do with? Currently do nothing.

Review comment:
   Actually, this part should be under well discussed. 
   For every changelog mode, from my perspective, should be treated 
differently. Now I only figured out what should be done in "UPSERT MODE" or 
`INSERT_ONLY` mode.
   
   What shall we do for other changelog?
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org