[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230246604
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class CreateViewTransformTest {
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map, Coder> NULL_OUTPUT_CODERS = null;
+
+  // [ window1 ] [--- window2 ---]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- 
watermark7
+  // (null, "hello")
+  //  (null, "world")
+  // (null, "hello")
+  //   ==> window1: {3} (calculate # of elements)
+  // (null, "a")
+  //   (null,"a")
+  // (null,"a")
+  //  
(null,"b")
+  //   => window2: {4} 
(calculate # of elements)
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final FixedWindows fixedwindows = 
FixedWindows.of(Duration.standardSeconds(1));
+final GroupByKeyAndWindowDoFnTransform gbkTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+Collections.emptyList(), /* additional outputs */
+WindowingStrategy.of(fixedwindows),
+emptyList(), /* side inputs */
+PipelineOptionsFactory.as(NemoPipelineOptions.class),
+SystemReduceFn.buffering(NULL_INPUT_CODER));
+final CreateViewTransform viewTransform =
+  new CreateViewTransform(new SumViewFn());
+
+final Instant ts1 = new Instant(1);
+final Instant ts2 = new Instant(100);
+final Instant ts3 = new Instant(300);
+final Watermark watermark = new Watermark(1003);
+final Instant ts4 = new Instant(1200);
+final Watermark watermark2 = new Watermark(1400);
+final Instant ts5 = new Instant(1600);
+final Instant ts6 = new Instant(1800);
+final Instant ts7 = new Instant(1900);
+final Watermark watermark3 = new Watermark(2100);
+
+
+final Transform.Context context = mock(Transform.Context.class);
+final TestOutputCollector oc = new TestOutputCollector();
+viewTransform.prepare(context, oc);
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), 
PaneInfo.NO_FIRING));
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), 
PaneInfo.NO_FIRING));
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), 
PaneInfo.NO_FIRING));
+
+

[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230246375
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -113,14 +115,19 @@ public void onData(final WindowedValue> 
element) {
 
   /**
* Process the collected data and trigger timers.
-   * @param watermark current watermark
+   * @param inputWatermark current input watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
-  private void processElementsAndTriggerTimers(final Watermark watermark,
+  private void processElementsAndTriggerTimers(final Watermark inputWatermark,
final Instant processingTime,
final Instant synchronizedTime) 
{
-keyToValues.forEach((key, val) -> {
+long outputTimestamp = Long.MAX_VALUE;
+
+for (final Map.Entry>> entry : 
keyToValues.entrySet()) {
+  final K key = entry.getKey();
+  final List> val = entry.getValue();
 
 Review comment:
   val -> values


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244875
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -113,14 +115,19 @@ public void onData(final WindowedValue> 
element) {
 
   /**
* Process the collected data and trigger timers.
-   * @param watermark current watermark
+   * @param inputWatermark current input watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
-  private void processElementsAndTriggerTimers(final Watermark watermark,
+  private void processElementsAndTriggerTimers(final Watermark inputWatermark,
final Instant processingTime,
final Instant synchronizedTime) 
{
-keyToValues.forEach((key, val) -> {
+long outputTimestamp = Long.MAX_VALUE;
 
 Review comment:
   maxOutputTimeStampOfEmittedWindows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245464
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -132,15 +139,26 @@ private void processElementsAndTriggerTimers(final 
Watermark watermark,
   }
 
   // Trigger timers
-  triggerTimers(key, watermark, processingTime, synchronizedTime);
+  final long keyOutputTimestamp =
 
 Review comment:
   minimumTimestampOfEmittedWindows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230246770
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class CreateViewTransformTest {
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map, Coder> NULL_OUTPUT_CODERS = null;
+
+  // [ window1 ] [--- window2 ---]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- 
watermark7
+  // (null, "hello")
+  //  (null, "world")
+  // (null, "hello")
+  //   ==> window1: {3} (calculate # of elements)
+  // (null, "a")
+  //   (null,"a")
+  // (null,"a")
+  //  
(null,"b")
+  //   => window2: {4} 
(calculate # of elements)
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final FixedWindows fixedwindows = 
FixedWindows.of(Duration.standardSeconds(1));
+final GroupByKeyAndWindowDoFnTransform gbkTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+Collections.emptyList(), /* additional outputs */
+WindowingStrategy.of(fixedwindows),
+emptyList(), /* side inputs */
+PipelineOptionsFactory.as(NemoPipelineOptions.class),
+SystemReduceFn.buffering(NULL_INPUT_CODER));
+final CreateViewTransform viewTransform =
+  new CreateViewTransform(new SumViewFn());
+
+final Instant ts1 = new Instant(1);
+final Instant ts2 = new Instant(100);
+final Instant ts3 = new Instant(300);
+final Watermark watermark = new Watermark(1003);
+final Instant ts4 = new Instant(1200);
+final Watermark watermark2 = new Watermark(1400);
+final Instant ts5 = new Instant(1600);
+final Instant ts6 = new Instant(1800);
+final Instant ts7 = new Instant(1900);
+final Watermark watermark3 = new Watermark(2100);
+
+
+final Transform.Context context = mock(Transform.Context.class);
+final TestOutputCollector oc = new TestOutputCollector();
+viewTransform.prepare(context, oc);
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), 
PaneInfo.NO_FIRING));
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), 
PaneInfo.NO_FIRING));
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), 
PaneInfo.NO_FIRING));
+
+

[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245211
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -191,16 +211,15 @@ private void triggerTimers(final K key,
 
 // output watermark
 // we set output watermark to the minimum of the timer data
-long outputWatermark = Long.MAX_VALUE;
+long keyOutputTimestamp = Long.MAX_VALUE;
 for (final TimerInternals.TimerData timer : timerDataList) {
-  if (outputWatermark > timer.getTimestamp().getMillis()) {
-outputWatermark = timer.getTimestamp().getMillis();
+  if (keyOutputTimestamp > timer.getTimestamp().getMillis()) {
 
 Review comment:
   Math.min?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230247105
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class CreateViewTransformTest {
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map, Coder> NULL_OUTPUT_CODERS = null;
+
+  // [ window1 ] [--- window2 ---]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- 
watermark7
+  // (null, "hello")
+  //  (null, "world")
+  // (null, "hello")
+  //   ==> window1: {3} (calculate # of elements)
+  // (null, "a")
+  //   (null,"a")
+  // (null,"a")
+  //  
(null,"b")
+  //   => window2: {4} 
(calculate # of elements)
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final FixedWindows fixedwindows = 
FixedWindows.of(Duration.standardSeconds(1));
+final GroupByKeyAndWindowDoFnTransform gbkTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+Collections.emptyList(), /* additional outputs */
+WindowingStrategy.of(fixedwindows),
+emptyList(), /* side inputs */
+PipelineOptionsFactory.as(NemoPipelineOptions.class),
+SystemReduceFn.buffering(NULL_INPUT_CODER));
+final CreateViewTransform viewTransform =
+  new CreateViewTransform(new SumViewFn());
+
+final Instant ts1 = new Instant(1);
+final Instant ts2 = new Instant(100);
+final Instant ts3 = new Instant(300);
+final Watermark watermark = new Watermark(1003);
+final Instant ts4 = new Instant(1200);
+final Watermark watermark2 = new Watermark(1400);
+final Instant ts5 = new Instant(1600);
+final Instant ts6 = new Instant(1800);
+final Instant ts7 = new Instant(1900);
+final Watermark watermark3 = new Watermark(2100);
+
+
+final Transform.Context context = mock(Transform.Context.class);
+final TestOutputCollector oc = new TestOutputCollector();
+viewTransform.prepare(context, oc);
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), 
PaneInfo.NO_FIRING));
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), 
PaneInfo.NO_FIRING));
+
+viewTransform.onData(WindowedValue.of(
+  KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), 
PaneInfo.NO_FIRING));
+
+

[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245355
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -179,7 +199,7 @@ private void triggerTimers(final K key,
 final List timerDataList = 
getEligibleTimers(timerInternals);
 
 if (timerDataList.isEmpty()) {
-  return;
+  return Long.MAX_VALUE;
 
 Review comment:
   How about Optional.empty()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244319
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 ##
 @@ -57,23 +63,69 @@ public void prepare(final Context context, final 
OutputCollector element) {
-// TODO #216: support window in view
-final KV kv = ((WindowedValue) element).getValue();
-multiView.getDataList().add(kv.getValue());
+  public void onData(final WindowedValue> element) {
+// The key of element is always null (beam's semantic)
+// because view is a globally materialized data regardless of key
+for (final BoundedWindow window : element.getWindows()) {
+  windowListMap.putIfAbsent(window, new ArrayList<>());
+  final List list = windowListMap.get(window);
+  list.add(element.getValue().getValue());
+}
+  }
+
+  @Override
+  public void onWatermark(final Watermark inputWatermark) {
+
+// If no data, just forwards the watermark
+if (windowListMap.size() == 0 && outputWatermark < 
inputWatermark.getTimestamp()) {
+  outputWatermark = inputWatermark.getTimestamp();
+  outputCollector.emitWatermark(inputWatermark);
+  return;
+}
+
+final Iterator>> iterator = 
windowListMap.entrySet().iterator();
+long outputTimestamp = Long.MAX_VALUE;
 
 Review comment:
   maxOutputTimeStampOfEmittedWindows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245547
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -132,15 +139,26 @@ private void processElementsAndTriggerTimers(final 
Watermark watermark,
   }
 
   // Trigger timers
-  triggerTimers(key, watermark, processingTime, synchronizedTime);
+  final long keyOutputTimestamp =
+triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
+  if (outputTimestamp > keyOutputTimestamp) {
 
 Review comment:
   Math.max?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244505
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 ##
 @@ -18,37 +18,43 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.*;
 
 /**
- * CreateView transform implementation.
- * @param  input type.
- * @param  output type.
+ * This transforms emits materialized data for each window.
+ * @param  input type
+ * @param  materialized output type
  */
-public final class CreateViewTransform extends 
NoWatermarkEmitTransform, WindowedValue> {
-  private final PCollectionView pCollectionView;
+public final class CreateViewTransform implements
+  Transform>, WindowedValue> {
   private OutputCollector> outputCollector;
   private final ViewFn, O> viewFn;
-  private final MultiView multiView;
+  private final Map> windowListMap;
+
+  // TODO #XXX: we should remove this variable by refactoring broadcast worker 
for side input
+  private boolean isEmitted = false;
+  private long outputWatermark;
 
 Review comment:
   lastEmittedWatermark?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245057
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -157,12 +175,14 @@ protected void beforeClose() {
   /**
* Trigger times for current key.
* When triggering, it emits the windowed data to downstream operators.
+   * It returns the minimum output timestamp.
+   * If no data is emitted, it returns Long.MAX_VALUE.
* @param key key
* @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
 
 Review comment:
   @return 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230243532
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 ##
 @@ -18,37 +18,43 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.*;
 
 /**
- * CreateView transform implementation.
- * @param  input type.
- * @param  output type.
+ * This transforms emits materialized data for each window.
+ * @param  input type
+ * @param  materialized output type
  */
-public final class CreateViewTransform extends 
NoWatermarkEmitTransform, WindowedValue> {
-  private final PCollectionView pCollectionView;
+public final class CreateViewTransform implements
+  Transform>, WindowedValue> {
   private OutputCollector> outputCollector;
   private final ViewFn, O> viewFn;
-  private final MultiView multiView;
+  private final Map> windowListMap;
+
+  // TODO #XXX: we should remove this variable by refactoring broadcast worker 
for side input
 
 Review comment:
   Please create a JIRA issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data

2018-11-01 Thread GitBox
johnyangk commented on a change in pull request #141: [NEMO-252] Fix 
CreatViewTransform to emit windowed materialized data
URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244234
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 ##
 @@ -57,23 +63,69 @@ public void prepare(final Context context, final 
OutputCollector element) {
-// TODO #216: support window in view
-final KV kv = ((WindowedValue) element).getValue();
-multiView.getDataList().add(kv.getValue());
+  public void onData(final WindowedValue> element) {
+// The key of element is always null (beam's semantic)
+// because view is a globally materialized data regardless of key
+for (final BoundedWindow window : element.getWindows()) {
+  windowListMap.putIfAbsent(window, new ArrayList<>());
+  final List list = windowListMap.get(window);
+  list.add(element.getValue().getValue());
+}
+  }
+
+  @Override
+  public void onWatermark(final Watermark inputWatermark) {
+
+// If no data, just forwards the watermark
+if (windowListMap.size() == 0 && outputWatermark < 
inputWatermark.getTimestamp()) {
+  outputWatermark = inputWatermark.getTimestamp();
+  outputCollector.emitWatermark(inputWatermark);
+  return;
+}
+
+final Iterator>> iterator = 
windowListMap.entrySet().iterator();
+long outputTimestamp = Long.MAX_VALUE;
+
+while (iterator.hasNext()) {
+  final Map.Entry> entry = iterator.next();
+  if (entry.getKey().maxTimestamp().getMillis() <= 
inputWatermark.getTimestamp()) {
+// emit the windowed data if the watermark timestamp > the window max 
boundary
+final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+outputCollector.emit(WindowedValue.of(
+  view, entry.getKey().maxTimestamp(), entry.getKey(), 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+iterator.remove();
+isEmitted = true;
+
+if (outputTimestamp > entry.getKey().maxTimestamp().getMillis()) {
 
 Review comment:
   outputTimeStamp = Math.max(outputTimeStamp, 
entry.getKey().maxTimestamp().getMillis()) ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services