jaehwan0214 commented on a change in pull request #299:
URL: https://github.com/apache/incubator-nemo/pull/299#discussion_r504079925



##########
File path: examples/beam/debug/errored_ir.json
##########
@@ -0,0 +1 @@
+{"vertices":[{"id":"vertex13","properties":{"class":"BeamBoundedSourceVertex","executionProperties":{"org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty":"None"},"source":"[]org.apache.beam.sdk.io.Read$Bounded:source=org.apache.beam.sdk.io.CompressedSource\n[source]org.apache.beam.sdk.io.CompressedSource:source=org.apache.beam.sdk.io.TextSource\n[source/source]org.apache.beam.sdk.io.TextSource:filePattern=/Users/backjaehwan/new/incubator-nemo/examples/beam/../resources/inputs/test_input_windowed_wordcount\n[source]org.apache.beam.sdk.io.CompressedSource:compressionMode=AUTO"}},{"id":"vertex14","properties":{"class":"OperatorVertex","executionProperties":{"org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty":"None"},"transform":"DoFnTransform
 / []org.apache.nemo.compiler.frontend.beam.PipelineTranslator / 
name=ParDo(Anonymous)/ParMultiDo(Anonymous)"}},{"id":"vertex15","properties":{"class":"OperatorVertex","executionProperties":{"org.a
 
pache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty":"None"},"transform":"DoFnTransform
 / []org.apache.nemo.compiler.frontend.beam.PipelineTranslator / 
name=MapElements/Map/ParMultiDo(Anonymous)"}},{"id":"vertex16","properties":{"class":"OperatorVertex","executionProperties":{"org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty":"None"},"transform":"WindowFnTransform
 / [windowFn]org.apache.beam.sdk.transforms.windowing.FixedWindows / 
size=5000\n[]org.apache.beam.sdk.transforms.windowing.Window$Assign / 
windowFn=org.apache.beam.sdk.transforms.windowing.FixedWindows"}},{"id":"vertex17","properties":{"class":"OperatorVertex","executionProperties":{"org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty":"None"},"transform":"GBKFinalTransform
 / []org.apache.beam.sdk.transforms.Combine$PerKey / 
combineFn=org.apache.beam.sdk.transforms.Sum$SumLongFn"}}],"edges":[{"src":"vertex16","dst":"vertex17","properties":{"id":"edge14","
 
executionProperties":{"org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty":"org.apache.nemo.compiler.frontend.beam.BeamKeyExtractor@845","org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty":"ONE_TO_ONE","org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty":"Pair[INTACT,0]","org.apache.nemo.common.ir.edge.executionproperty.KeyEncoderProperty":"org.apache.beam.sdk.coders.StringUtf8Coder","org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty":"PUSH","org.apache.nemo.common.ir.edge.executionproperty.KeyDecoderProperty":"org.apache.beam.sdk.coders.StringUtf8Coder","org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty":"MEMORY_STORE"}}},{"src"
 
:"vertex14","dst":"vertex15","properties":{"id":"edge12","executionProperties":{"org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty":"org.apache.nemo.compiler.frontend.beam.BeamKeyExtractor@845","org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty":"ONE_TO_ONE","org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty":"Pair[INTACT,0]","org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty":"PUSH","org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty":"MEMORY_STORE"}}},{"src":"vertex15","dst":"vertex16","properties":{"id":"edge13","executionProperties":{"org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty":"org.apache.beam.sdk.util
 
.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty":"org.apache.nemo.compiler.frontend.beam.BeamKeyExtractor@845","org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty":"ONE_TO_ONE","org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty":"Pair[INTACT,0]","org.apache.nemo.common.ir.edge.executionproperty.KeyEncoderProperty":"org.apache.beam.sdk.coders.StringUtf8Coder","org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty":"PUSH","org.apache.nemo.common.ir.edge.executionproperty.KeyDecoderProperty":"org.apache.beam.sdk.coders.StringUtf8Coder","org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty":"MEMORY_STORE"}}},{"src":"vertex13","dst":"vertex14","properties":{"id":"edge11","executionProperties":{"org.apache.nemo.common.ir.edge.ex
 
ecutionproperty.EncoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty":"org.apache.nemo.compiler.frontend.beam.BeamKeyExtractor@845","org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty":"ONE_TO_ONE","org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty":"Pair[INTACT,0]","org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty":"org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder","org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty":"PUSH","org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty":"MEMORY_STORE"}}}]}

Review comment:
       Thank you. Fixed it.

##########
File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.*;
+
+/**
+ * InMemoryTimerInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryTimerInternalsFactory.class.getName());
+
+  private Map<K, InMemoryTimerInternals> timerInternalsMap;

Review comment:
       Thank you. Fixed it.

##########
File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InMemoryStateInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryStateInternalsFactory<K> implements 
StateInternalsFactory<K> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryStateInternalsFactory.class.getName());
+
+  private Map<K, StateInternals> stateInternalMap;
+
+  public InMemoryStateInternalsFactory() {
+    this.stateInternalMap = new HashMap<>();

Review comment:
       Thank you. Fixed it.

##########
File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InMemoryStateInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryStateInternalsFactory<K> implements 
StateInternalsFactory<K> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryStateInternalsFactory.class.getName());
+
+  private Map<K, StateInternals> stateInternalMap;

Review comment:
       Thank you. Fixed it.

##########
File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation when input 
data is unbounded or is not in
+ * global window.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKTransform<K, InputT, OutputT>
+  extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, 
OutputT>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GBKTransform.class.getName());
+  private final SystemReduceFn reduceFn;
+  private transient InMemoryTimerInternalsFactory<K> 
inMemoryTimerInternalsFactory;
+  private transient InMemoryStateInternalsFactory<K> 
inMemoryStateInternalsFactory;
+  private Watermark prevOutputWatermark;
+  private Map<K, Watermark> keyOutputWatermarkMap;
+  private Watermark inputWatermark;
+  private transient OutputCollector originOc;
+  private boolean dataReceived = false;
+
+  public GBKTransform(final Coder<K> keyCoder,
+                      final Map<TupleTag<?>, Coder<?>> outputCoders,
+                      final TupleTag<KV<K, OutputT>> mainOutputTag,
+                      final WindowingStrategy<?, ?> windowingStrategy,
+                      final PipelineOptions options,
+                      final SystemReduceFn reduceFn,
+                      final DoFnSchemaInformation doFnSchemaInformation,
+                      final DisplayData displayData) {
+    super(null,
+      null,
+      outputCoders,
+      mainOutputTag,
+      Collections.emptyList(),  /* no additional outputs */
+      windowingStrategy,
+      Collections.emptyMap(), /* no additional side inputs */
+      options,
+      displayData,
+      doFnSchemaInformation,
+      Collections.<String, PCollectionView<?>>emptyMap()); /* does not have 
side inputs */
+    this.reduceFn = reduceFn;
+    this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+    this.inputWatermark = new Watermark(Long.MIN_VALUE);
+    this.keyOutputWatermarkMap = new HashMap<>();
+  }
+
+  /**
+   * This creates a new DoFn that groups elements by key and window.
+   * @param doFn original doFn.
+   * @return GroupAlsoByWindowViaWindowSetNewDoFn
+   */
+  @Override
+  protected DoFn wrapDoFn(final DoFn doFn) {
+    if (inMemoryStateInternalsFactory == null) {
+      this.inMemoryStateInternalsFactory = new 
InMemoryStateInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryStateInternalFactroy is already set");
+    }
+
+    if (inMemoryTimerInternalsFactory == null) {
+      this.inMemoryTimerInternalsFactory = new 
InMemoryTimerInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryTimerInternalsFactory is already set");
+    }
+
+    // This function performs group by key and window operation.
+    return
+      GroupAlsoByWindowViaWindowSetNewDoFn.create(
+        getWindowingStrategy(),
+        inMemoryStateInternalsFactory,
+        inMemoryTimerInternalsFactory,
+        null, // does not have side input.
+        reduceFn,
+        getOutputManager(),
+        getMainOutputTag());
+  }
+
+  /** Wrapper function of output collector. */
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    originOc = oc;
+    return new GBKOutputCollector(oc);
+  }
+
+  /**
+   * Every time a single element arrives, this method invokes runner to 
process a single element.
+   * The collected data are emitted at {@link 
GBKTransform#onWatermark(Watermark)}
+   * @param element input data element.
+   */
+  @Override
+  public void onData(final WindowedValue<KV<K, InputT>> element) {
+      dataReceived = true;
+      try {
+        checkAndInvokeBundle();
+        final KV<K, InputT> kv = element.getValue();
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(kv.getKey(),
+            Collections.singletonList(element.withValue(kv.getValue())));
+        
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+        checkAndFinishBundle();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("exception trigggered element " + 
element.toString());
+      }
+  }
+
+  /**
+   * Process the collected data, trigger timers, and emit watermark to 
downstream operators.
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   * @param triggerWatermark watermark
+   */
+  private void processElementsAndTriggerTimers(final Instant processingTime,
+                                               final Instant synchronizedTime,
+                                               final Watermark 
triggerWatermark) {
+    triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+    emitOutputWatermark();
+  }
+
+  /**
+   * Emit watermark to downstream operators.
+   * Output watermark = max(prev output watermark, min(input watermark, 
watermark holds)).
+   */
+  private void emitOutputWatermark() {
+    // Find min watermark hold
+    Watermark minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+      ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+      : Collections.min(keyOutputWatermarkMap.values());
+
+    Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
+
+    while (outputWatermarkCandidate.getTimestamp() > 
prevOutputWatermark.getTimestamp()) {
+      // Progress
+      prevOutputWatermark = outputWatermarkCandidate;
+      // Emit watermark
+      getOutputCollector().emitWatermark(outputWatermarkCandidate);
+      // Remove minimum watermark holds
+      if (minWatermarkHold.getTimestamp() == 
outputWatermarkCandidate.getTimestamp()) {
+        final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+        keyOutputWatermarkMap.entrySet()
+          .removeIf(entry -> entry.getValue().getTimestamp() == 
minWatermarkTimestamp);
+      }
+
+      minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+        ? new Watermark(Long.MAX_VALUE) : 
Collections.min(keyOutputWatermarkMap.values());
+
+      outputWatermarkCandidate = new Watermark(
+        Math.max(prevOutputWatermark.getTimestamp(),
+          Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
+    }
+  }
+
+  /**
+   * Trigger timers that need to be fired at {@param watermark}.
+   * @param watermark watermark
+   */
+  @Override
+  public void onWatermark(final Watermark watermark) throws RuntimeException {
+    if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+      throw new RuntimeException("Received watermark is before inputWatermark 
in GBKTransform");
+    }
+    checkAndInvokeBundle();
+    inputWatermark = watermark;
+    // Triggering timers
+    try {
+      processElementsAndTriggerTimers(Instant.now(), Instant.now(), 
inputWatermark);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    checkAndFinishBundle();
+  }
+
+  /**
+   * This advances the input watermark and processing time to the timestamp 
max value
+   * in order to emit all data.
+   */
+  @Override
+  protected void beforeClose() {
+    // Finish any pending windows by advancing the input watermark to infinity.
+    inputWatermark = new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    processElementsAndTriggerTimers(
+      BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, 
inputWatermark);
+  }
+
+  /**
+   * Trigger timers. When triggering, it emits the windowed data to downstream 
operators.
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   * @param watermark watermark
+   */
+  private void triggerTimers(final Instant processingTime,
+                            final Instant synchronizedTime,
+                            final Watermark watermark) {
+
+    Iterator<Map.Entry<K, InMemoryTimerInternals>> iter =
+      
inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
+    while (iter.hasNext()) {
+      final Map.Entry<K, InMemoryTimerInternals> curr = iter.next();
+      try {
+        curr.getValue().advanceInputWatermark(new 
Instant(watermark.getTimestamp()));
+        curr.getValue().advanceProcessingTime(processingTime);
+        curr.getValue().advanceSynchronizedProcessingTime(synchronizedTime);
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException();
+      }
+      for (TimeDomain domain : TimeDomain.values()) {

Review comment:
       Thank you. Fixed it.

##########
File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation when input 
data is unbounded or is not in
+ * global window.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKTransform<K, InputT, OutputT>
+  extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, 
OutputT>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GBKTransform.class.getName());
+  private final SystemReduceFn reduceFn;
+  private transient InMemoryTimerInternalsFactory<K> 
inMemoryTimerInternalsFactory;
+  private transient InMemoryStateInternalsFactory<K> 
inMemoryStateInternalsFactory;
+  private Watermark prevOutputWatermark;
+  private Map<K, Watermark> keyOutputWatermarkMap;
+  private Watermark inputWatermark;
+  private transient OutputCollector originOc;
+  private boolean dataReceived = false;
+
+  public GBKTransform(final Coder<K> keyCoder,
+                      final Map<TupleTag<?>, Coder<?>> outputCoders,
+                      final TupleTag<KV<K, OutputT>> mainOutputTag,
+                      final WindowingStrategy<?, ?> windowingStrategy,
+                      final PipelineOptions options,
+                      final SystemReduceFn reduceFn,
+                      final DoFnSchemaInformation doFnSchemaInformation,
+                      final DisplayData displayData) {
+    super(null,
+      null,
+      outputCoders,
+      mainOutputTag,
+      Collections.emptyList(),  /* no additional outputs */
+      windowingStrategy,
+      Collections.emptyMap(), /* no additional side inputs */
+      options,
+      displayData,
+      doFnSchemaInformation,
+      Collections.<String, PCollectionView<?>>emptyMap()); /* does not have 
side inputs */
+    this.reduceFn = reduceFn;
+    this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+    this.inputWatermark = new Watermark(Long.MIN_VALUE);
+    this.keyOutputWatermarkMap = new HashMap<>();
+  }
+
+  /**
+   * This creates a new DoFn that groups elements by key and window.
+   * @param doFn original doFn.
+   * @return GroupAlsoByWindowViaWindowSetNewDoFn
+   */
+  @Override
+  protected DoFn wrapDoFn(final DoFn doFn) {
+    if (inMemoryStateInternalsFactory == null) {
+      this.inMemoryStateInternalsFactory = new 
InMemoryStateInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryStateInternalFactroy is already set");
+    }
+
+    if (inMemoryTimerInternalsFactory == null) {
+      this.inMemoryTimerInternalsFactory = new 
InMemoryTimerInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryTimerInternalsFactory is already set");
+    }
+
+    // This function performs group by key and window operation.
+    return
+      GroupAlsoByWindowViaWindowSetNewDoFn.create(
+        getWindowingStrategy(),
+        inMemoryStateInternalsFactory,
+        inMemoryTimerInternalsFactory,
+        null, // does not have side input.
+        reduceFn,
+        getOutputManager(),
+        getMainOutputTag());
+  }
+
+  /** Wrapper function of output collector. */
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    originOc = oc;
+    return new GBKOutputCollector(oc);
+  }
+
+  /**
+   * Every time a single element arrives, this method invokes runner to 
process a single element.
+   * The collected data are emitted at {@link 
GBKTransform#onWatermark(Watermark)}
+   * @param element input data element.
+   */
+  @Override
+  public void onData(final WindowedValue<KV<K, InputT>> element) {
+      dataReceived = true;
+      try {
+        checkAndInvokeBundle();
+        final KV<K, InputT> kv = element.getValue();
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(kv.getKey(),
+            Collections.singletonList(element.withValue(kv.getValue())));
+        
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+        checkAndFinishBundle();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("exception trigggered element " + 
element.toString());
+      }
+  }
+
+  /**
+   * Process the collected data, trigger timers, and emit watermark to 
downstream operators.
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   * @param triggerWatermark watermark
+   */
+  private void processElementsAndTriggerTimers(final Instant processingTime,
+                                               final Instant synchronizedTime,
+                                               final Watermark 
triggerWatermark) {
+    triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+    emitOutputWatermark();
+  }
+
+  /**
+   * Emit watermark to downstream operators.
+   * Output watermark = max(prev output watermark, min(input watermark, 
watermark holds)).
+   */
+  private void emitOutputWatermark() {
+    // Find min watermark hold
+    Watermark minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+      ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+      : Collections.min(keyOutputWatermarkMap.values());
+
+    Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
+
+    while (outputWatermarkCandidate.getTimestamp() > 
prevOutputWatermark.getTimestamp()) {
+      // Progress
+      prevOutputWatermark = outputWatermarkCandidate;
+      // Emit watermark
+      getOutputCollector().emitWatermark(outputWatermarkCandidate);
+      // Remove minimum watermark holds
+      if (minWatermarkHold.getTimestamp() == 
outputWatermarkCandidate.getTimestamp()) {
+        final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+        keyOutputWatermarkMap.entrySet()
+          .removeIf(entry -> entry.getValue().getTimestamp() == 
minWatermarkTimestamp);
+      }
+
+      minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+        ? new Watermark(Long.MAX_VALUE) : 
Collections.min(keyOutputWatermarkMap.values());
+
+      outputWatermarkCandidate = new Watermark(
+        Math.max(prevOutputWatermark.getTimestamp(),
+          Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
+    }
+  }
+
+  /**
+   * Trigger timers that need to be fired at {@param watermark}.
+   * @param watermark watermark
+   */
+  @Override
+  public void onWatermark(final Watermark watermark) throws RuntimeException {
+    if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+      throw new RuntimeException("Received watermark is before inputWatermark 
in GBKTransform");

Review comment:
       Thank you. Fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to