jaehwan0214 commented on a change in pull request #299:
URL: https://github.com/apache/incubator-nemo/pull/299#discussion_r504359453
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
##########
@@ -490,30 +542,32 @@ private static AbstractDoFnTransform
createDoFnTransform(final PipelineTranslati
}
/**
- * Create a group by key transform.
- * It returns GroupByKeyAndWindowDoFnTransform if window function is not
default.
+ * Returns the correct type of GroupByKey transform by checking whether
global windowing strategy is used.
*
* @param ctx translation context
* @param beamNode the beam node to be translated
- * @return group by key transform
+ * @return GroupByKey transform
*/
private static Transform createGBKTransform(
final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode) {
- final AppliedPTransform pTransform =
beamNode.toAppliedPTransform(ctx.getPipeline());
+ final AppliedPTransform<?, ?, ?> pTransform =
beamNode.toAppliedPTransform(ctx.getPipeline());
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
if (isGlobalWindow(beamNode, ctx.getPipeline())) {
+ // GroupByKey Transform for batch data
Review comment:
Thank you. Fixed it.
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
##########
@@ -490,30 +542,32 @@ private static AbstractDoFnTransform
createDoFnTransform(final PipelineTranslati
}
/**
- * Create a group by key transform.
- * It returns GroupByKeyAndWindowDoFnTransform if window function is not
default.
+ * Returns the correct type of GroupByKey transform by checking whether
global windowing strategy is used.
*
* @param ctx translation context
* @param beamNode the beam node to be translated
- * @return group by key transform
+ * @return GroupByKey transform
*/
private static Transform createGBKTransform(
final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode) {
- final AppliedPTransform pTransform =
beamNode.toAppliedPTransform(ctx.getPipeline());
+ final AppliedPTransform<?, ?, ?> pTransform =
beamNode.toAppliedPTransform(ctx.getPipeline());
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
if (isGlobalWindow(beamNode, ctx.getPipeline())) {
+ // GroupByKey Transform for batch data
return new GroupByKeyTransform();
} else {
- return new GroupByKeyAndWindowDoFnTransform(
+ // GroupByKey Transform for streaming data
Review comment:
Thank you. Fixed it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]