jaehwan0214 commented on a change in pull request #299:
URL: https://github.com/apache/incubator-nemo/pull/299#discussion_r503365424



##########
File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming 
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+  extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, 
OutputT>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+  private final SystemReduceFn reduceFn;
+  private transient InMemoryTimerInternalsFactory<K> 
inMemoryTimerInternalsFactory;
+  private transient InMemoryStateInternalsFactory<K> 
inMemoryStateInternalsFactory;
+  private volatile Watermark prevOutputWatermark;
+  private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+  private volatile Watermark inputWatermark;
+  private transient OutputCollector originOc;
+  private volatile boolean dataReceived = false;
+
+  public GBKStreamingTransform(final Coder<K> keyCoder,
+                           final Map<TupleTag<?>, Coder<?>> outputCoders,
+                           final TupleTag<KV<K, OutputT>> mainOutputTag,
+                           final WindowingStrategy<?, ?> windowingStrategy,
+                           final PipelineOptions options,
+                           final SystemReduceFn reduceFn,
+                           final DoFnSchemaInformation doFnSchemaInformation,
+                           final DisplayData displayData) {
+    super(null,
+      null,
+      outputCoders,
+      mainOutputTag,
+      Collections.emptyList(),  /* no additional outputs */
+      windowingStrategy,
+      Collections.emptyMap(), /* no additional side inputs */
+      options,
+      displayData,
+      doFnSchemaInformation,
+      Collections.<String, PCollectionView<?>>emptyMap()); /* does not have 
side inputs */
+    this.reduceFn = reduceFn;
+    this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+    this.inputWatermark = new Watermark(Long.MIN_VALUE);
+    this.keyAndWatermarkHoldMap = new HashMap<>();
+  }
+
+  /**
+   * This creates a new DoFn that groups elements by key and window.
+   * @param doFn original doFn.
+   * @return GroupAlsoByWindowViaWindowSetNewDoFn
+   */
+  @Override
+  protected DoFn wrapDoFn(final DoFn doFn) {
+    if (inMemoryStateInternalsFactory == null) {
+      this.inMemoryStateInternalsFactory = new 
InMemoryStateInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryStateInternalFactroy is already set");
+    }
+
+    if (inMemoryTimerInternalsFactory == null) {
+      this.inMemoryTimerInternalsFactory = new 
InMemoryTimerInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryTimerInternalsFactory is already set");
+    }
+
+    // This function performs group by key and window operation.
+    return
+      GroupAlsoByWindowViaWindowSetNewDoFn.create(
+        getWindowingStrategy(),
+        inMemoryStateInternalsFactory,
+        inMemoryTimerInternalsFactory,
+        null, // does not have side input.
+        reduceFn,
+        getOutputManager(),
+        getMainOutputTag());
+  }
+
+  /** Wrapper function of output collector. */
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    originOc = oc;
+    return new GBKOutputCollector(oc);
+  }
+
+  /**
+   * Every time a single element arrives, this method invokes runner to 
process a single element.
+   * The collected data are emitted at {@link 
GBKStreamingTransform#onWatermark(Watermark)}
+   * @param element input data element.
+   */
+  @Override
+  public void onData(final WindowedValue<KV<K, InputT>> element) {
+      dataReceived = true;
+      try {
+        checkAndInvokeBundle();
+        final KV<K, InputT> kv = element.getValue();
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(kv.getKey(),
+            Collections.singletonList(element.withValue(kv.getValue())));
+        
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+        checkAndFinishBundle();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("exception trigggered element " + 
element.toString());
+      }
+  }
+
+  /**
+   * Process the collected data, trigger timers, and emit watermark to 
downstream operators.
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   * @param triggerWatermark watermark
+   */
+  private void processElementsAndTriggerTimers(final Instant processingTime,
+                                               final Instant synchronizedTime,
+                                               final Watermark 
triggerWatermark) {
+    triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+    emitOutputWatermark();
+  }
+
+  /**
+   * Emit watermark to downstream operators.
+   * Output watermark = max(prev output watermark, min(input watermark, 
watermark holds)).
+   */
+  private void emitOutputWatermark() {
+    // Find min watermark hold
+    Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+      ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+      : Collections.min(keyAndWatermarkHoldMap.values());
+
+    Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
+
+    while (outputWatermarkCandidate.getTimestamp() > 
prevOutputWatermark.getTimestamp()) {
+      // Progress
+      prevOutputWatermark = outputWatermarkCandidate;
+      // Emit watermark
+      getOutputCollector().emitWatermark(outputWatermarkCandidate);
+      // Remove minimum watermark holds
+      if (minWatermarkHold.getTimestamp() == 
outputWatermarkCandidate.getTimestamp()) {
+        final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+        keyAndWatermarkHoldMap.entrySet()
+          .removeIf(entry -> entry.getValue().getTimestamp() == 
minWatermarkTimestamp);
+      }
+
+      minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+        ? new Watermark(Long.MAX_VALUE) : 
Collections.min(keyAndWatermarkHoldMap.values());
+
+      outputWatermarkCandidate = new Watermark(
+        Math.max(prevOutputWatermark.getTimestamp(),
+          Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
+    }
+  }
+
+  /**
+   * Trigger timers that need to be fired.
+   * @param watermark watermark
+   */
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+      return;

Review comment:
       I agree. Fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to