[GitHub] [flink] xintongsong commented on a change in pull request #14860: [FLINK-21269] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements

2021-02-05 Thread GitBox


xintongsong commented on a change in pull request #14860:
URL: https://github.com/apache/flink/pull/14860#discussion_r569980985



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -158,6 +159,9 @@
 
 private boolean shouldExecuteInBatchMode;
 
+// Records the slot sharing groups and their corresponding ResourceProfile
+private Map slotSharingGroupResources = new 
HashMap<>();

Review comment:
   `final`

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##
@@ -47,6 +48,9 @@
  */
 private ResourceSpec resourceSpec = ResourceSpec.ZERO;

Review comment:
   It could be confusing what's the differences between the original 
`resourceSpec` and the new introduced `resourceProfile`. I would suggest to 
mark this field and its getter `@Deprecated`.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
 return this;
 }
 
+/**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+ * grouped operator together. In cases grouped operators are deployed into 
separate slots, the
+ * slot resources will be derived from the specified group requirement.

Review comment:
   `group requirement` -> `group requirements`

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) {
 graph.setTimeCharacteristic(timeCharacteristic);
 graph.setJobName(jobName);
 graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : 
JobType.STREAMING);
+
graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources));

Review comment:
   Not sure if this is necessary to make it unmodifiable. I would not 
expect the generator to maintain certain states and protect them from being 
accidentally modified.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
 return this;
 }
 
+/**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+ * grouped operator together. In cases grouped operators are deployed into 
separate slots, the

Review comment:
   `grouped operator` -> `grouped operators`

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##
@@ -47,6 +48,9 @@
  */
 private ResourceSpec resourceSpec = ResourceSpec.ZERO;
 
+// Represents resources of all tasks in the group. Default to be null.

Review comment:
   `Default to be UNKNOWN`.

##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##
@@ -1195,6 +1197,55 @@ public void 
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
 assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex);
 }
 
+@Test
+public void testSlotSharingResourceConfiguration() {
+final String slotSharingGroup1 = "slot-a";
+final String slotSharingGroup2 = "slot-b";
+final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
+final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
+final ResourceProfile resourceProfile3 = 
ResourceProfile.fromResources(3, 30);
+final Map slotSharingGroupResource = new 
HashMap<>();
+slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+slotSharingGroupResource.put(
+StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 
resourceProfile3);
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.fromElements(1, 2, 3)
+.name(slotSharingGroup1)
+.slotSharingGroup(slotSharingGroup1)
+.map(x -> x + 1)
+.name(slotSharingGroup2)
+.slotSharingGroup(slotSharingGroup2)
+.map(x -> x * x)
+

[GitHub] [flink] xintongsong commented on a change in pull request #14860: [FLINK-21269] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements

2021-02-03 Thread GitBox


xintongsong commented on a change in pull request #14860:
URL: https://github.com/apache/flink/pull/14860#discussion_r569980985



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -158,6 +159,9 @@
 
 private boolean shouldExecuteInBatchMode;
 
+// Records the slot sharing groups and their corresponding ResourceProfile
+private Map slotSharingGroupResources = new 
HashMap<>();

Review comment:
   `final`

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##
@@ -47,6 +48,9 @@
  */
 private ResourceSpec resourceSpec = ResourceSpec.ZERO;

Review comment:
   It could be confusing what's the differences between the original 
`resourceSpec` and the new introduced `resourceProfile`. I would suggest to 
mark this field and its getter `@Deprecated`.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
 return this;
 }
 
+/**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+ * grouped operator together. In cases grouped operators are deployed into 
separate slots, the
+ * slot resources will be derived from the specified group requirement.

Review comment:
   `group requirement` -> `group requirements`

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) {
 graph.setTimeCharacteristic(timeCharacteristic);
 graph.setJobName(jobName);
 graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : 
JobType.STREAMING);
+
graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources));

Review comment:
   Not sure if this is necessary to make it unmodifiable. I would not 
expect the generator to maintain certain states and protect them from being 
accidentally modified.

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##
@@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) {
 return this;
 }
 
+/**
+ * Specify fine-grained resource requirements for slot sharing groups.
+ *
+ * Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+ * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+ * grouped operator together. In cases grouped operators are deployed into 
separate slots, the

Review comment:
   `grouped operator` -> `grouped operators`

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
##
@@ -47,6 +48,9 @@
  */
 private ResourceSpec resourceSpec = ResourceSpec.ZERO;
 
+// Represents resources of all tasks in the group. Default to be null.

Review comment:
   `Default to be UNKNOWN`.

##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##
@@ -1195,6 +1197,55 @@ public void 
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
 assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex);
 }
 
+@Test
+public void testSlotSharingResourceConfiguration() {
+final String slotSharingGroup1 = "slot-a";
+final String slotSharingGroup2 = "slot-b";
+final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
+final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
+final ResourceProfile resourceProfile3 = 
ResourceProfile.fromResources(3, 30);
+final Map slotSharingGroupResource = new 
HashMap<>();
+slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1);
+slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2);
+slotSharingGroupResource.put(
+StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 
resourceProfile3);
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.fromElements(1, 2, 3)
+.name(slotSharingGroup1)
+.slotSharingGroup(slotSharingGroup1)
+.map(x -> x + 1)
+.name(slotSharingGroup2)
+.slotSharingGroup(slotSharingGroup2)
+.map(x -> x * x)
+