taegeonum commented on a change in pull request #299:
URL: https://github.com/apache/incubator-nemo/pull/299#discussion_r503033513
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
##########
@@ -38,6 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
Review comment:
please remove the empty line
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
Review comment:
Is this class only for streaming? or used both for streaming and batch?
If so, could you please modify the comments and the class name?
`GBKStreamingTransform` -> `GBKTransform`?
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
Review comment:
rm volatile
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
Review comment:
rm volatile
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
+ private transient OutputCollector originOc;
+ private volatile boolean dataReceived = false;
+
+ public GBKStreamingTransform(final Coder<K> keyCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<KV<K, OutputT>> mainOutputTag,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn,
+ final DoFnSchemaInformation doFnSchemaInformation,
+ final DisplayData displayData) {
+ super(null,
+ null,
+ outputCoders,
+ mainOutputTag,
+ Collections.emptyList(), /* no additional outputs */
+ windowingStrategy,
+ Collections.emptyMap(), /* no additional side inputs */
+ options,
+ displayData,
+ doFnSchemaInformation,
+ Collections.<String, PCollectionView<?>>emptyMap()); /* does not have
side inputs */
+ this.reduceFn = reduceFn;
+ this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ this.inputWatermark = new Watermark(Long.MIN_VALUE);
+ this.keyAndWatermarkHoldMap = new HashMap<>();
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ if (inMemoryStateInternalsFactory == null) {
+ this.inMemoryStateInternalsFactory = new
InMemoryStateInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryStateInternalFactroy is already set");
+ }
+
+ if (inMemoryTimerInternalsFactory == null) {
+ this.inMemoryTimerInternalsFactory = new
InMemoryTimerInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryTimerInternalsFactory is already set");
+ }
+
+ // This function performs group by key and window operation.
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
+ null, // does not have side input.
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ /** Wrapper function of output collector. */
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ originOc = oc;
+ return new GBKOutputCollector(oc);
+ }
+
+ /**
+ * Every time a single element arrives, this method invokes runner to
process a single element.
+ * The collected data are emitted at {@link
GBKStreamingTransform#onWatermark(Watermark)}
+ * @param element input data element.
+ */
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("exception trigggered element " +
element.toString());
+ }
+ }
+
+ /**
+ * Process the collected data, trigger timers, and emit watermark to
downstream operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void processElementsAndTriggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark
triggerWatermark) {
+ triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+ emitOutputWatermark();
+ }
+
+ /**
+ * Emit watermark to downstream operators.
+ * Output watermark = max(prev output watermark, min(input watermark,
watermark holds)).
+ */
+ private void emitOutputWatermark() {
+ // Find min watermark hold
+ Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ : Collections.min(keyAndWatermarkHoldMap.values());
+
+ Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+
+ while (outputWatermarkCandidate.getTimestamp() >
prevOutputWatermark.getTimestamp()) {
+ // Progress
+ prevOutputWatermark = outputWatermarkCandidate;
+ // Emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() ==
outputWatermarkCandidate.getTimestamp()) {
+ final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+ keyAndWatermarkHoldMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() ==
minWatermarkTimestamp);
+ }
+
+ minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) :
Collections.min(keyAndWatermarkHoldMap.values());
+
+ outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+ }
+ }
+
+ /**
+ * Trigger timers that need to be fired.
+ * @param watermark watermark
+ */
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+ return;
+ }
+ checkAndInvokeBundle();
+ inputWatermark = watermark;
+ // Triggering timers
+ try {
+ processElementsAndTriggerTimers(Instant.now(), Instant.now(),
inputWatermark);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ // Emit watermark to downstream operators
+ checkAndFinishBundle();
+ }
+
+ /**
+ * This advances the input watermark and processing time to the timestamp
max value
+ * in order to emit all data.
+ */
+ @Override
+ protected void beforeClose() {
+ // Finish any pending windows by advancing the input watermark to infinity.
+ inputWatermark = new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ processElementsAndTriggerTimers(
+ BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE,
inputWatermark);
+ }
+
+ /**
+ * Trigger timers. When triggering, it emits the windowed data to downstream
operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark triggerWatermark) {
+
+ inMemoryTimerInternalsFactory.setInputWatermarkTime(new
Instant(triggerWatermark.getTimestamp()));
+ inMemoryTimerInternalsFactory.setProcessingTime(processingTime);
Review comment:
Same as above
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
Review comment:
Is this only for streaming? or for both streaming and batch windowed
operations? If so, could you please modify the comment and change the class
name? `GBKStreamingTransform` -> `GroupByKeyAndWindowDoFnTransform` or
`GBKWindowTransform`?
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
Review comment:
`keyOutputWatermarkMap`
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
+ private transient OutputCollector originOc;
+ private volatile boolean dataReceived = false;
+
+ public GBKStreamingTransform(final Coder<K> keyCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<KV<K, OutputT>> mainOutputTag,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn,
+ final DoFnSchemaInformation doFnSchemaInformation,
+ final DisplayData displayData) {
+ super(null,
+ null,
+ outputCoders,
+ mainOutputTag,
+ Collections.emptyList(), /* no additional outputs */
+ windowingStrategy,
+ Collections.emptyMap(), /* no additional side inputs */
+ options,
+ displayData,
+ doFnSchemaInformation,
+ Collections.<String, PCollectionView<?>>emptyMap()); /* does not have
side inputs */
+ this.reduceFn = reduceFn;
+ this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ this.inputWatermark = new Watermark(Long.MIN_VALUE);
+ this.keyAndWatermarkHoldMap = new HashMap<>();
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ if (inMemoryStateInternalsFactory == null) {
+ this.inMemoryStateInternalsFactory = new
InMemoryStateInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryStateInternalFactroy is already set");
+ }
+
+ if (inMemoryTimerInternalsFactory == null) {
+ this.inMemoryTimerInternalsFactory = new
InMemoryTimerInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryTimerInternalsFactory is already set");
+ }
+
+ // This function performs group by key and window operation.
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
+ null, // does not have side input.
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ /** Wrapper function of output collector. */
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ originOc = oc;
+ return new GBKOutputCollector(oc);
+ }
+
+ /**
+ * Every time a single element arrives, this method invokes runner to
process a single element.
+ * The collected data are emitted at {@link
GBKStreamingTransform#onWatermark(Watermark)}
+ * @param element input data element.
+ */
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("exception trigggered element " +
element.toString());
+ }
+ }
+
+ /**
+ * Process the collected data, trigger timers, and emit watermark to
downstream operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void processElementsAndTriggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark
triggerWatermark) {
+ triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+ emitOutputWatermark();
+ }
+
+ /**
+ * Emit watermark to downstream operators.
+ * Output watermark = max(prev output watermark, min(input watermark,
watermark holds)).
+ */
+ private void emitOutputWatermark() {
+ // Find min watermark hold
+ Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ : Collections.min(keyAndWatermarkHoldMap.values());
+
+ Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+
+ while (outputWatermarkCandidate.getTimestamp() >
prevOutputWatermark.getTimestamp()) {
+ // Progress
+ prevOutputWatermark = outputWatermarkCandidate;
+ // Emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() ==
outputWatermarkCandidate.getTimestamp()) {
+ final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+ keyAndWatermarkHoldMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() ==
minWatermarkTimestamp);
+ }
+
+ minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) :
Collections.min(keyAndWatermarkHoldMap.values());
+
+ outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+ }
+ }
+
+ /**
+ * Trigger timers that need to be fired.
+ * @param watermark watermark
+ */
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+ return;
+ }
+ checkAndInvokeBundle();
+ inputWatermark = watermark;
+ // Triggering timers
+ try {
+ processElementsAndTriggerTimers(Instant.now(), Instant.now(),
inputWatermark);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ // Emit watermark to downstream operators
+ checkAndFinishBundle();
+ }
+
+ /**
+ * This advances the input watermark and processing time to the timestamp
max value
+ * in order to emit all data.
+ */
+ @Override
+ protected void beforeClose() {
+ // Finish any pending windows by advancing the input watermark to infinity.
+ inputWatermark = new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ processElementsAndTriggerTimers(
+ BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE,
inputWatermark);
+ }
+
+ /**
+ * Trigger timers. When triggering, it emits the windowed data to downstream
operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark triggerWatermark) {
+
+ inMemoryTimerInternalsFactory.setInputWatermarkTime(new
Instant(triggerWatermark.getTimestamp()));
Review comment:
Why do we need this function for timerInternalsFactory?
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
+ private transient OutputCollector originOc;
+ private volatile boolean dataReceived = false;
+
+ public GBKStreamingTransform(final Coder<K> keyCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<KV<K, OutputT>> mainOutputTag,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn,
+ final DoFnSchemaInformation doFnSchemaInformation,
+ final DisplayData displayData) {
+ super(null,
+ null,
+ outputCoders,
+ mainOutputTag,
+ Collections.emptyList(), /* no additional outputs */
+ windowingStrategy,
+ Collections.emptyMap(), /* no additional side inputs */
+ options,
+ displayData,
+ doFnSchemaInformation,
+ Collections.<String, PCollectionView<?>>emptyMap()); /* does not have
side inputs */
+ this.reduceFn = reduceFn;
+ this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ this.inputWatermark = new Watermark(Long.MIN_VALUE);
+ this.keyAndWatermarkHoldMap = new HashMap<>();
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ if (inMemoryStateInternalsFactory == null) {
+ this.inMemoryStateInternalsFactory = new
InMemoryStateInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryStateInternalFactroy is already set");
+ }
+
+ if (inMemoryTimerInternalsFactory == null) {
+ this.inMemoryTimerInternalsFactory = new
InMemoryTimerInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryTimerInternalsFactory is already set");
+ }
+
+ // This function performs group by key and window operation.
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
+ null, // does not have side input.
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ /** Wrapper function of output collector. */
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ originOc = oc;
+ return new GBKOutputCollector(oc);
+ }
+
+ /**
+ * Every time a single element arrives, this method invokes runner to
process a single element.
+ * The collected data are emitted at {@link
GBKStreamingTransform#onWatermark(Watermark)}
+ * @param element input data element.
+ */
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("exception trigggered element " +
element.toString());
+ }
+ }
+
+ /**
+ * Process the collected data, trigger timers, and emit watermark to
downstream operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void processElementsAndTriggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark
triggerWatermark) {
+ triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+ emitOutputWatermark();
+ }
+
+ /**
+ * Emit watermark to downstream operators.
+ * Output watermark = max(prev output watermark, min(input watermark,
watermark holds)).
+ */
+ private void emitOutputWatermark() {
+ // Find min watermark hold
+ Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ : Collections.min(keyAndWatermarkHoldMap.values());
+
+ Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+
+ while (outputWatermarkCandidate.getTimestamp() >
prevOutputWatermark.getTimestamp()) {
+ // Progress
+ prevOutputWatermark = outputWatermarkCandidate;
+ // Emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() ==
outputWatermarkCandidate.getTimestamp()) {
+ final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+ keyAndWatermarkHoldMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() ==
minWatermarkTimestamp);
+ }
+
+ minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) :
Collections.min(keyAndWatermarkHoldMap.values());
+
+ outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+ }
+ }
+
+ /**
+ * Trigger timers that need to be fired.
+ * @param watermark watermark
+ */
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+ return;
+ }
+ checkAndInvokeBundle();
+ inputWatermark = watermark;
+ // Triggering timers
+ try {
+ processElementsAndTriggerTimers(Instant.now(), Instant.now(),
inputWatermark);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ // Emit watermark to downstream operators
+ checkAndFinishBundle();
+ }
+
+ /**
+ * This advances the input watermark and processing time to the timestamp
max value
+ * in order to emit all data.
+ */
+ @Override
+ protected void beforeClose() {
+ // Finish any pending windows by advancing the input watermark to infinity.
+ inputWatermark = new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ processElementsAndTriggerTimers(
+ BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE,
inputWatermark);
+ }
+
+ /**
+ * Trigger timers. When triggering, it emits the windowed data to downstream
operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark triggerWatermark) {
+
+ inMemoryTimerInternalsFactory.setInputWatermarkTime(new
Instant(triggerWatermark.getTimestamp()));
+ inMemoryTimerInternalsFactory.setProcessingTime(processingTime);
+
inMemoryTimerInternalsFactory.setSynchronizedProcessingTime(synchronizedTime);
Review comment:
Same as above
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
+ private transient OutputCollector originOc;
+ private volatile boolean dataReceived = false;
+
+ public GBKStreamingTransform(final Coder<K> keyCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<KV<K, OutputT>> mainOutputTag,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn,
+ final DoFnSchemaInformation doFnSchemaInformation,
+ final DisplayData displayData) {
+ super(null,
+ null,
+ outputCoders,
+ mainOutputTag,
+ Collections.emptyList(), /* no additional outputs */
+ windowingStrategy,
+ Collections.emptyMap(), /* no additional side inputs */
+ options,
+ displayData,
+ doFnSchemaInformation,
+ Collections.<String, PCollectionView<?>>emptyMap()); /* does not have
side inputs */
+ this.reduceFn = reduceFn;
+ this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ this.inputWatermark = new Watermark(Long.MIN_VALUE);
+ this.keyAndWatermarkHoldMap = new HashMap<>();
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ if (inMemoryStateInternalsFactory == null) {
+ this.inMemoryStateInternalsFactory = new
InMemoryStateInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryStateInternalFactroy is already set");
+ }
+
+ if (inMemoryTimerInternalsFactory == null) {
+ this.inMemoryTimerInternalsFactory = new
InMemoryTimerInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryTimerInternalsFactory is already set");
+ }
+
+ // This function performs group by key and window operation.
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
+ null, // does not have side input.
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ /** Wrapper function of output collector. */
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ originOc = oc;
+ return new GBKOutputCollector(oc);
+ }
+
+ /**
+ * Every time a single element arrives, this method invokes runner to
process a single element.
+ * The collected data are emitted at {@link
GBKStreamingTransform#onWatermark(Watermark)}
+ * @param element input data element.
+ */
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("exception trigggered element " +
element.toString());
+ }
+ }
+
+ /**
+ * Process the collected data, trigger timers, and emit watermark to
downstream operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void processElementsAndTriggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark
triggerWatermark) {
+ triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+ emitOutputWatermark();
+ }
+
+ /**
+ * Emit watermark to downstream operators.
+ * Output watermark = max(prev output watermark, min(input watermark,
watermark holds)).
+ */
+ private void emitOutputWatermark() {
+ // Find min watermark hold
+ Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ : Collections.min(keyAndWatermarkHoldMap.values());
+
+ Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+
+ while (outputWatermarkCandidate.getTimestamp() >
prevOutputWatermark.getTimestamp()) {
+ // Progress
+ prevOutputWatermark = outputWatermarkCandidate;
+ // Emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() ==
outputWatermarkCandidate.getTimestamp()) {
+ final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+ keyAndWatermarkHoldMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() ==
minWatermarkTimestamp);
+ }
+
+ minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) :
Collections.min(keyAndWatermarkHoldMap.values());
+
+ outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+ }
+ }
+
+ /**
+ * Trigger timers that need to be fired.
+ * @param watermark watermark
+ */
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+ return;
Review comment:
How about throwing a RuntimeException? In fact, this should not happen
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/NemoStateBackend.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.state.State;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Keep track of states in {@link InMemoryStateInternals}. */
+public final class NemoStateBackend {
Review comment:
Why do we need this class?
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
Review comment:
rm volatile
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
+ private transient OutputCollector originOc;
+ private volatile boolean dataReceived = false;
+
+ public GBKStreamingTransform(final Coder<K> keyCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<KV<K, OutputT>> mainOutputTag,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn,
+ final DoFnSchemaInformation doFnSchemaInformation,
+ final DisplayData displayData) {
+ super(null,
+ null,
+ outputCoders,
+ mainOutputTag,
+ Collections.emptyList(), /* no additional outputs */
+ windowingStrategy,
+ Collections.emptyMap(), /* no additional side inputs */
+ options,
+ displayData,
+ doFnSchemaInformation,
+ Collections.<String, PCollectionView<?>>emptyMap()); /* does not have
side inputs */
+ this.reduceFn = reduceFn;
+ this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+ this.inputWatermark = new Watermark(Long.MIN_VALUE);
+ this.keyAndWatermarkHoldMap = new HashMap<>();
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ if (inMemoryStateInternalsFactory == null) {
+ this.inMemoryStateInternalsFactory = new
InMemoryStateInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryStateInternalFactroy is already set");
+ }
+
+ if (inMemoryTimerInternalsFactory == null) {
+ this.inMemoryTimerInternalsFactory = new
InMemoryTimerInternalsFactory<>();
+ } else {
+ LOG.info("InMemoryTimerInternalsFactory is already set");
+ }
+
+ // This function performs group by key and window operation.
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ inMemoryStateInternalsFactory,
+ inMemoryTimerInternalsFactory,
+ null, // does not have side input.
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ /** Wrapper function of output collector. */
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ originOc = oc;
+ return new GBKOutputCollector(oc);
+ }
+
+ /**
+ * Every time a single element arrives, this method invokes runner to
process a single element.
+ * The collected data are emitted at {@link
GBKStreamingTransform#onWatermark(Watermark)}
+ * @param element input data element.
+ */
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("exception trigggered element " +
element.toString());
+ }
+ }
+
+ /**
+ * Process the collected data, trigger timers, and emit watermark to
downstream operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void processElementsAndTriggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark
triggerWatermark) {
+ triggerTimers(processingTime, synchronizedTime, triggerWatermark);
+ emitOutputWatermark();
+ }
+
+ /**
+ * Emit watermark to downstream operators.
+ * Output watermark = max(prev output watermark, min(input watermark,
watermark holds)).
+ */
+ private void emitOutputWatermark() {
+ // Find min watermark hold
+ Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ : Collections.min(keyAndWatermarkHoldMap.values());
+
+ Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+
+ while (outputWatermarkCandidate.getTimestamp() >
prevOutputWatermark.getTimestamp()) {
+ // Progress
+ prevOutputWatermark = outputWatermarkCandidate;
+ // Emit watermark
+ getOutputCollector().emitWatermark(outputWatermarkCandidate);
+ // Remove minimum watermark holds
+ if (minWatermarkHold.getTimestamp() ==
outputWatermarkCandidate.getTimestamp()) {
+ final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+ keyAndWatermarkHoldMap.entrySet()
+ .removeIf(entry -> entry.getValue().getTimestamp() ==
minWatermarkTimestamp);
+ }
+
+ minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) :
Collections.min(keyAndWatermarkHoldMap.values());
+
+ outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(),
inputWatermark.getTimestamp())));
+ }
+ }
+
+ /**
+ * Trigger timers that need to be fired.
+ * @param watermark watermark
+ */
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+ return;
+ }
+ checkAndInvokeBundle();
+ inputWatermark = watermark;
+ // Triggering timers
+ try {
+ processElementsAndTriggerTimers(Instant.now(), Instant.now(),
inputWatermark);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ // Emit watermark to downstream operators
+ checkAndFinishBundle();
+ }
+
+ /**
+ * This advances the input watermark and processing time to the timestamp
max value
+ * in order to emit all data.
+ */
+ @Override
+ protected void beforeClose() {
+ // Finish any pending windows by advancing the input watermark to infinity.
+ inputWatermark = new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ processElementsAndTriggerTimers(
+ BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE,
inputWatermark);
+ }
+
+ /**
+ * Trigger timers. When triggering, it emits the windowed data to downstream
operators.
+ * @param processingTime processing time
+ * @param synchronizedTime synchronized time
+ * @param triggerWatermark watermark
+ */
+ private void triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime,
+ final Watermark triggerWatermark) {
+
+ inMemoryTimerInternalsFactory.setInputWatermarkTime(new
Instant(triggerWatermark.getTimestamp()));
Review comment:
Can't we use `InMemoryTimerInternals.advanceInputWatermark`?
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKStreamingTransform.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation for streaming
data.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class GBKStreamingTransform<K, InputT, OutputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K,
OutputT>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GBKStreamingTransform.class.getName());
+ private final SystemReduceFn reduceFn;
+ private transient InMemoryTimerInternalsFactory<K>
inMemoryTimerInternalsFactory;
+ private transient InMemoryStateInternalsFactory<K>
inMemoryStateInternalsFactory;
+ private volatile Watermark prevOutputWatermark;
+ private volatile Map<K, Watermark> keyAndWatermarkHoldMap;
+ private volatile Watermark inputWatermark;
+ private transient OutputCollector originOc;
+ private volatile boolean dataReceived = false;
Review comment:
rm volatile
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/NemoTimerInternals.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.nemo.common.Pair;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+/**
+ * Keep track of timer for a specific key.
+ * @param <K> key type
+ */
+public class NemoTimerInternals<K> implements TimerInternals {
Review comment:
Do we need this class?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]