johnyangk commented on a change in pull request #196: [NEMO-253] Refactor
getInternal(Main/Additional)OutputMap in TaskExecutor
URL: https://github.com/apache/incubator-nemo/pull/196#discussion_r260591774
##########
File path:
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
##########
@@ -540,20 +545,34 @@ private boolean handleDataFetchers(final
List<DataFetcher> fetchers) {
return map;
}
- // TODO #253: Refactor getInternal(Main/Additional)OutputMap
- private Map<String, List<NextIntraTaskOperatorInfo>>
getInternalAdditionalOutputMap(
+ /**
+ * Return a map of Internal Outputs associated with their output tag.
+ * If an edge has no output tag, its info are added to the main key.
+ *
+ * @param irVertex source irVertex
+ * @param irVertexDag DAG of IRVertex and RuntimeEdge
+ * @param edgeIndexMap Map of edge and index
+ * @param operatorWatermarkManagerMap Map of irVertex and
InputWatermarkManager
+ * @return Map<OutputTag, List<NextIntraTaskOperatorInfo>>
+ */
+ private Map<String, List<NextIntraTaskOperatorInfo>> getInternalOutputMap(
final IRVertex irVertex,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
final Map<Edge, Integer> edgeIndexMap,
final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
- // Add all intra-task additional tags to additional output map.
+ // Add all intra-task tags to additional output map.
final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
irVertexDag.getOutgoingEdgesOf(irVertex.getId())
.stream()
- .filter(edge ->
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
.map(edge -> {
- final String outputTag =
edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+ final boolean isPresent =
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
+ final String outputTag;
+ if (isPresent) {
+ outputTag =
edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+ } else {
+ outputTag = "main";
Review comment:
Maybe refer to `AdditionalOutputTagProperty.MAIN_OUTPUT_TAG`?
(See comment above)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services