[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230246604 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Materialization; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.*; + +import static java.util.Collections.emptyList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public final class CreateViewTransformTest { + + private final static Coder NULL_INPUT_CODER = null; + private final static Map, Coder> NULL_OUTPUT_CODERS = null; + + // [ window1 ] [--- window2 ---] + // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // (null, "hello") + // (null, "world") + // (null, "hello") + // ==> window1: {3} (calculate # of elements) + // (null, "a") + // (null,"a") + // (null,"a") + // (null,"b") + // => window2: {4} (calculate # of elements) + @Test + @SuppressWarnings("unchecked") + public void test() { + +final TupleTag outputTag = new TupleTag<>("main-output"); +final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); +final GroupByKeyAndWindowDoFnTransform gbkTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +Collections.emptyList(), /* additional outputs */ +WindowingStrategy.of(fixedwindows), +emptyList(), /* side inputs */ +PipelineOptionsFactory.as(NemoPipelineOptions.class), +SystemReduceFn.buffering(NULL_INPUT_CODER)); +final CreateViewTransform viewTransform = + new CreateViewTransform(new SumViewFn()); + +final Instant ts1 = new Instant(1); +final Instant ts2 = new Instant(100); +final Instant ts3 = new Instant(300); +final Watermark watermark = new Watermark(1003); +final Instant ts4 = new Instant(1200); +final Watermark watermark2 = new Watermark(1400); +final Instant ts5 = new Instant(1600); +final Instant ts6 = new Instant(1800); +final Instant ts7 = new Instant(1900); +final Watermark watermark3 = new Watermark(2100); + + +final Transform.Context context = mock(Transform.Context.class); +final TestOutputCollector oc = new TestOutputCollector(); +viewTransform.prepare(context, oc); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + +
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230246375 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -113,14 +115,19 @@ public void onData(final WindowedValue> element) { /** * Process the collected data and trigger timers. - * @param watermark current watermark + * @param inputWatermark current input watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void processElementsAndTriggerTimers(final Watermark watermark, + private void processElementsAndTriggerTimers(final Watermark inputWatermark, final Instant processingTime, final Instant synchronizedTime) { -keyToValues.forEach((key, val) -> { +long outputTimestamp = Long.MAX_VALUE; + +for (final Map.Entry>> entry : keyToValues.entrySet()) { + final K key = entry.getKey(); + final List> val = entry.getValue(); Review comment: val -> values This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244875 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -113,14 +115,19 @@ public void onData(final WindowedValue> element) { /** * Process the collected data and trigger timers. - * @param watermark current watermark + * @param inputWatermark current input watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void processElementsAndTriggerTimers(final Watermark watermark, + private void processElementsAndTriggerTimers(final Watermark inputWatermark, final Instant processingTime, final Instant synchronizedTime) { -keyToValues.forEach((key, val) -> { +long outputTimestamp = Long.MAX_VALUE; Review comment: maxOutputTimeStampOfEmittedWindows? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245464 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -132,15 +139,26 @@ private void processElementsAndTriggerTimers(final Watermark watermark, } // Trigger timers - triggerTimers(key, watermark, processingTime, synchronizedTime); + final long keyOutputTimestamp = Review comment: minimumTimestampOfEmittedWindows? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230246770 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Materialization; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.*; + +import static java.util.Collections.emptyList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public final class CreateViewTransformTest { + + private final static Coder NULL_INPUT_CODER = null; + private final static Map, Coder> NULL_OUTPUT_CODERS = null; + + // [ window1 ] [--- window2 ---] + // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // (null, "hello") + // (null, "world") + // (null, "hello") + // ==> window1: {3} (calculate # of elements) + // (null, "a") + // (null,"a") + // (null,"a") + // (null,"b") + // => window2: {4} (calculate # of elements) + @Test + @SuppressWarnings("unchecked") + public void test() { + +final TupleTag outputTag = new TupleTag<>("main-output"); +final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); +final GroupByKeyAndWindowDoFnTransform gbkTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +Collections.emptyList(), /* additional outputs */ +WindowingStrategy.of(fixedwindows), +emptyList(), /* side inputs */ +PipelineOptionsFactory.as(NemoPipelineOptions.class), +SystemReduceFn.buffering(NULL_INPUT_CODER)); +final CreateViewTransform viewTransform = + new CreateViewTransform(new SumViewFn()); + +final Instant ts1 = new Instant(1); +final Instant ts2 = new Instant(100); +final Instant ts3 = new Instant(300); +final Watermark watermark = new Watermark(1003); +final Instant ts4 = new Instant(1200); +final Watermark watermark2 = new Watermark(1400); +final Instant ts5 = new Instant(1600); +final Instant ts6 = new Instant(1800); +final Instant ts7 = new Instant(1900); +final Watermark watermark3 = new Watermark(2100); + + +final Transform.Context context = mock(Transform.Context.class); +final TestOutputCollector oc = new TestOutputCollector(); +viewTransform.prepare(context, oc); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + +
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245211 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -191,16 +211,15 @@ private void triggerTimers(final K key, // output watermark // we set output watermark to the minimum of the timer data -long outputWatermark = Long.MAX_VALUE; +long keyOutputTimestamp = Long.MAX_VALUE; for (final TimerInternals.TimerData timer : timerDataList) { - if (outputWatermark > timer.getTimestamp().getMillis()) { -outputWatermark = timer.getTimestamp().getMillis(); + if (keyOutputTimestamp > timer.getTimestamp().getMillis()) { Review comment: Math.min? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230247105 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Materialization; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.*; + +import static java.util.Collections.emptyList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public final class CreateViewTransformTest { + + private final static Coder NULL_INPUT_CODER = null; + private final static Map, Coder> NULL_OUTPUT_CODERS = null; + + // [ window1 ] [--- window2 ---] + // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // (null, "hello") + // (null, "world") + // (null, "hello") + // ==> window1: {3} (calculate # of elements) + // (null, "a") + // (null,"a") + // (null,"a") + // (null,"b") + // => window2: {4} (calculate # of elements) + @Test + @SuppressWarnings("unchecked") + public void test() { + +final TupleTag outputTag = new TupleTag<>("main-output"); +final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); +final GroupByKeyAndWindowDoFnTransform gbkTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +Collections.emptyList(), /* additional outputs */ +WindowingStrategy.of(fixedwindows), +emptyList(), /* side inputs */ +PipelineOptionsFactory.as(NemoPipelineOptions.class), +SystemReduceFn.buffering(NULL_INPUT_CODER)); +final CreateViewTransform viewTransform = + new CreateViewTransform(new SumViewFn()); + +final Instant ts1 = new Instant(1); +final Instant ts2 = new Instant(100); +final Instant ts3 = new Instant(300); +final Watermark watermark = new Watermark(1003); +final Instant ts4 = new Instant(1200); +final Watermark watermark2 = new Watermark(1400); +final Instant ts5 = new Instant(1600); +final Instant ts6 = new Instant(1800); +final Instant ts7 = new Instant(1900); +final Watermark watermark3 = new Watermark(2100); + + +final Transform.Context context = mock(Transform.Context.class); +final TestOutputCollector oc = new TestOutputCollector(); +viewTransform.prepare(context, oc); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + +viewTransform.onData(WindowedValue.of( + KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + +
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245355 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -179,7 +199,7 @@ private void triggerTimers(final K key, final List timerDataList = getEligibleTimers(timerInternals); if (timerDataList.isEmpty()) { - return; + return Long.MAX_VALUE; Review comment: How about Optional.empty()? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244319 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java ## @@ -57,23 +63,69 @@ public void prepare(final Context context, final OutputCollector element) { -// TODO #216: support window in view -final KV kv = ((WindowedValue) element).getValue(); -multiView.getDataList().add(kv.getValue()); + public void onData(final WindowedValue> element) { +// The key of element is always null (beam's semantic) +// because view is a globally materialized data regardless of key +for (final BoundedWindow window : element.getWindows()) { + windowListMap.putIfAbsent(window, new ArrayList<>()); + final List list = windowListMap.get(window); + list.add(element.getValue().getValue()); +} + } + + @Override + public void onWatermark(final Watermark inputWatermark) { + +// If no data, just forwards the watermark +if (windowListMap.size() == 0 && outputWatermark < inputWatermark.getTimestamp()) { + outputWatermark = inputWatermark.getTimestamp(); + outputCollector.emitWatermark(inputWatermark); + return; +} + +final Iterator>> iterator = windowListMap.entrySet().iterator(); +long outputTimestamp = Long.MAX_VALUE; Review comment: maxOutputTimeStampOfEmittedWindows? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245547 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -132,15 +139,26 @@ private void processElementsAndTriggerTimers(final Watermark watermark, } // Trigger timers - triggerTimers(key, watermark, processingTime, synchronizedTime); + final long keyOutputTimestamp = +triggerTimers(key, inputWatermark, processingTime, synchronizedTime); + if (outputTimestamp > keyOutputTimestamp) { Review comment: Math.max? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244505 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java ## @@ -18,37 +18,43 @@ */ package org.apache.nemo.compiler.frontend.beam.transform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; +import java.util.*; /** - * CreateView transform implementation. - * @param input type. - * @param output type. + * This transforms emits materialized data for each window. + * @param input type + * @param materialized output type */ -public final class CreateViewTransform extends NoWatermarkEmitTransform, WindowedValue> { - private final PCollectionView pCollectionView; +public final class CreateViewTransform implements + Transform>, WindowedValue> { private OutputCollector> outputCollector; private final ViewFn, O> viewFn; - private final MultiView multiView; + private final Map> windowListMap; + + // TODO #XXX: we should remove this variable by refactoring broadcast worker for side input + private boolean isEmitted = false; + private long outputWatermark; Review comment: lastEmittedWatermark? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230245057 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -157,12 +175,14 @@ protected void beforeClose() { /** * Trigger times for current key. * When triggering, it emits the windowed data to downstream operators. + * It returns the minimum output timestamp. + * If no data is emitted, it returns Long.MAX_VALUE. * @param key key * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ Review comment: @return This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230243532 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java ## @@ -18,37 +18,43 @@ */ package org.apache.nemo.compiler.frontend.beam.transform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.OutputCollector; -import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; +import java.util.*; /** - * CreateView transform implementation. - * @param input type. - * @param output type. + * This transforms emits materialized data for each window. + * @param input type + * @param materialized output type */ -public final class CreateViewTransform extends NoWatermarkEmitTransform, WindowedValue> { - private final PCollectionView pCollectionView; +public final class CreateViewTransform implements + Transform>, WindowedValue> { private OutputCollector> outputCollector; private final ViewFn, O> viewFn; - private final MultiView multiView; + private final Map> windowListMap; + + // TODO #XXX: we should remove this variable by refactoring broadcast worker for side input Review comment: Please create a JIRA issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data
johnyangk commented on a change in pull request #141: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data URL: https://github.com/apache/incubator-nemo/pull/141#discussion_r230244234 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java ## @@ -57,23 +63,69 @@ public void prepare(final Context context, final OutputCollector element) { -// TODO #216: support window in view -final KV kv = ((WindowedValue) element).getValue(); -multiView.getDataList().add(kv.getValue()); + public void onData(final WindowedValue> element) { +// The key of element is always null (beam's semantic) +// because view is a globally materialized data regardless of key +for (final BoundedWindow window : element.getWindows()) { + windowListMap.putIfAbsent(window, new ArrayList<>()); + final List list = windowListMap.get(window); + list.add(element.getValue().getValue()); +} + } + + @Override + public void onWatermark(final Watermark inputWatermark) { + +// If no data, just forwards the watermark +if (windowListMap.size() == 0 && outputWatermark < inputWatermark.getTimestamp()) { + outputWatermark = inputWatermark.getTimestamp(); + outputCollector.emitWatermark(inputWatermark); + return; +} + +final Iterator>> iterator = windowListMap.entrySet().iterator(); +long outputTimestamp = Long.MAX_VALUE; + +while (iterator.hasNext()) { + final Map.Entry> entry = iterator.next(); + if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) { +// emit the windowed data if the watermark timestamp > the window max boundary +final O view = viewFn.apply(new MultiView<>(entry.getValue())); +outputCollector.emit(WindowedValue.of( + view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING)); +iterator.remove(); +isEmitted = true; + +if (outputTimestamp > entry.getKey().maxTimestamp().getMillis()) { Review comment: outputTimeStamp = Math.max(outputTimeStamp, entry.getKey().maxTimestamp().getMillis()) ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services