[GitHub] [flink] xintongsong commented on a change in pull request #14860: [FLINK-21269] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements
xintongsong commented on a change in pull request #14860: URL: https://github.com/apache/flink/pull/14860#discussion_r569980985 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -158,6 +159,9 @@ private boolean shouldExecuteInBatchMode; +// Records the slot sharing groups and their corresponding ResourceProfile +private Map slotSharingGroupResources = new HashMap<>(); Review comment: `final` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java ## @@ -47,6 +48,9 @@ */ private ResourceSpec resourceSpec = ResourceSpec.ZERO; Review comment: It could be confusing what's the differences between the original `resourceSpec` and the new introduced `resourceProfile`. I would suggest to mark this field and its getter `@Deprecated`. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } +/** + * Specify fine-grained resource requirements for slot sharing groups. + * + * Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operator together. In cases grouped operators are deployed into separate slots, the + * slot resources will be derived from the specified group requirement. Review comment: `group requirement` -> `group requirements` ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) { graph.setTimeCharacteristic(timeCharacteristic); graph.setJobName(jobName); graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : JobType.STREAMING); + graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources)); Review comment: Not sure if this is necessary to make it unmodifiable. I would not expect the generator to maintain certain states and protect them from being accidentally modified. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } +/** + * Specify fine-grained resource requirements for slot sharing groups. + * + * Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operator together. In cases grouped operators are deployed into separate slots, the Review comment: `grouped operator` -> `grouped operators` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java ## @@ -47,6 +48,9 @@ */ private ResourceSpec resourceSpec = ResourceSpec.ZERO; +// Represents resources of all tasks in the group. Default to be null. Review comment: `Default to be UNKNOWN`. ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ## @@ -1195,6 +1197,55 @@ public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled( assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex); } +@Test +public void testSlotSharingResourceConfiguration() { +final String slotSharingGroup1 = "slot-a"; +final String slotSharingGroup2 = "slot-b"; +final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); +final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); +final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30); +final Map slotSharingGroupResource = new HashMap<>(); +slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1); +slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2); +slotSharingGroupResource.put( +StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.fromElements(1, 2, 3) +.name(slotSharingGroup1) +.slotSharingGroup(slotSharingGroup1) +.map(x -> x + 1) +.name(slotSharingGroup2) +.slotSharingGroup(slotSharingGroup2) +.map(x -> x * x) +
[GitHub] [flink] xintongsong commented on a change in pull request #14860: [FLINK-21269] Introduce runtime interfaces for specifying SlotSharingGroup-based resource requirements
xintongsong commented on a change in pull request #14860: URL: https://github.com/apache/flink/pull/14860#discussion_r569980985 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -158,6 +159,9 @@ private boolean shouldExecuteInBatchMode; +// Records the slot sharing groups and their corresponding ResourceProfile +private Map slotSharingGroupResources = new HashMap<>(); Review comment: `final` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java ## @@ -47,6 +48,9 @@ */ private ResourceSpec resourceSpec = ResourceSpec.ZERO; Review comment: It could be confusing what's the differences between the original `resourceSpec` and the new introduced `resourceProfile`. I would suggest to mark this field and its getter `@Deprecated`. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } +/** + * Specify fine-grained resource requirements for slot sharing groups. + * + * Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operator together. In cases grouped operators are deployed into separate slots, the + * slot resources will be derived from the specified group requirement. Review comment: `group requirement` -> `group requirements` ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -291,6 +309,7 @@ private void configureStreamGraph(final StreamGraph graph) { graph.setTimeCharacteristic(timeCharacteristic); graph.setJobName(jobName); graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : JobType.STREAMING); + graph.setSlotSharingGroupResource(Collections.unmodifiableMap(slotSharingGroupResources)); Review comment: Not sure if this is necessary to make it unmodifiable. I would not expect the generator to maintain certain states and protect them from being accidentally modified. ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -259,6 +263,20 @@ public StreamGraphGenerator setJobName(String jobName) { return this; } +/** + * Specify fine-grained resource requirements for slot sharing groups. + * + * Note that a slot sharing group hints the scheduler that the grouped operators CAN be + * deployed into a shared slot. There's no guarantee that the scheduler always deploy the + * grouped operator together. In cases grouped operators are deployed into separate slots, the Review comment: `grouped operator` -> `grouped operators` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java ## @@ -47,6 +48,9 @@ */ private ResourceSpec resourceSpec = ResourceSpec.ZERO; +// Represents resources of all tasks in the group. Default to be null. Review comment: `Default to be UNKNOWN`. ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ## @@ -1195,6 +1197,55 @@ public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled( assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex); } +@Test +public void testSlotSharingResourceConfiguration() { +final String slotSharingGroup1 = "slot-a"; +final String slotSharingGroup2 = "slot-b"; +final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); +final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); +final ResourceProfile resourceProfile3 = ResourceProfile.fromResources(3, 30); +final Map slotSharingGroupResource = new HashMap<>(); +slotSharingGroupResource.put(slotSharingGroup1, resourceProfile1); +slotSharingGroupResource.put(slotSharingGroup2, resourceProfile2); +slotSharingGroupResource.put( +StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, resourceProfile3); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.fromElements(1, 2, 3) +.name(slotSharingGroup1) +.slotSharingGroup(slotSharingGroup1) +.map(x -> x + 1) +.name(slotSharingGroup2) +.slotSharingGroup(slotSharingGroup2) +.map(x -> x * x) +