[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118339 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:33 Start Date: 02/Jul/18 18:33 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java index 35399b92e4c..178c4c1c0d8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java @@ -18,20 +18,40 @@ package org.apache.beam.fn.harness; import com.google.auto.service.AutoService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; import java.io.IOException; +import java.util.Collection; import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.control.BundleSplitListener; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver; +import org.apache.beam.fn.harness.state.BeamFnStateClient; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms; import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.fn.function.ThrowingFunction; +import org.apache.beam.sdk.fn.function.ThrowingRunnable; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.KV; /** Executes different components of Combine PTransforms. */ -public class CombineRunners { +public class CombineRunners { /** A registrar which provides a factory to handle combine component PTransforms. */ @AutoService(PTransformRunnerFactory.Registrar.class) @@ -41,7 +61,7 @@ public Map getPTransformRunnerFactories() { return ImmutableMap.of( BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE), - MapFnRunners.forValueMapFnFactory(CombineRunners::createPrecombineMapFunction), + new PrecombineFactory(), BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS), MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction), BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS), @@ -51,17 +71,120 @@ } } - static - ThrowingFunction, KV> createPrecombineMapFunction( - String pTransformId, PTransform pTransform) throws IOException { -CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); -CombineFn combineFn = -(CombineFn) -SerializableUtils.deserializeFromByteArray( - combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { +
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118312=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118312 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303656 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +/** An interface that groups inputs to an accumulator and flushes the output. */ +public interface GroupingTable { + + /** Abstract interface of things that accept inputs one at a time via process(). */ + interface Receiver { Review comment: I don't think we'll need to make this generic in this sense. Consider using `FnDataReceiver` directly here instead of `Receiver`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118312) Time Spent: 4h 40m (was: 4.5h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 4h 40m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118317=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118317 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199305122 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java ## @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import org.apache.beam.runners.core.GlobalCombineFnRunner; +import org.apache.beam.runners.core.GlobalCombineFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** Static utility methods that provide {@link GroupingTable} implementations. */ +public class PrecombineGroupingTable +implements GroupingTable { + /** Returns a {@link GroupingTable} that combines inputs into a accumulator. */ + public static GroupingTable, InputT, AccumT> combining( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +new CoderSizeEstimator<>(accumulatorCoder)); + } + + /** + * Returns a {@link GroupingTable} that combines inputs into a accumulator with sampling {@link + * SizeEstimator SizeEstimators}. + */ + public static + GroupingTable, InputT, AccumT> combiningAndSampling( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder, + double sizeEstimatorSampleRate) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +sizeEstimatorSampleRate, +1.0), +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(accumulatorCoder), sizeEstimatorSampleRate, 1.0)); + } + + /** Provides client-specific operations for grouping keys. */ + public interface GroupingKeyCreator { +Object createGroupingKey(K key) throws Exception; + } + + /** Implements Precombine GroupingKeyCreator via Coder. */ + public static class WindowingCoderGroupingKeyCreator + implements GroupingKeyCreator> { + +private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE; + +private final Coder coder; + +
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118315 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303702 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +/** An interface that groups inputs to an accumulator and flushes the output. */ +public interface GroupingTable { + + /** Abstract interface of things that accept inputs one at a time via process(). */ + interface Receiver { +/** Processes the element. */ +void process(Object outputElem) throws Exception; + } + + /** Adds a pair to this table, possibly flushing some entries to output if the table is full. */ + void put(Object pair, Receiver receiver) throws Exception; Review comment: You can use `KV` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118315) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 5h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118318=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118318 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199577433 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java ## @@ -93,6 +93,7 @@ public Integer extractOutput(Integer accum) { private RunnerApi.PTransform pTransform; private String inputPCollectionId; private String outputPCollectionId; + private RunnerApi.Pipeline pProto; Review comment: nit: `pProto` -> `pipeline` or `pipelineProto` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118318) Time Spent: 5h 20m (was: 5h 10m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 5h 20m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118316=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118316 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303794 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java ## @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import org.apache.beam.runners.core.GlobalCombineFnRunner; +import org.apache.beam.runners.core.GlobalCombineFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** Static utility methods that provide {@link GroupingTable} implementations. */ +public class PrecombineGroupingTable Review comment: Consider calling this class `GroupingTables`. I don't know how much re-use we will get on this class, you can make it an internal detail of `CombineRunners` instead of exposing it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118316) Time Spent: 5h 10m (was: 5h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 5h 10m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118311=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118311 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303571 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -51,17 +71,120 @@ } } - static - ThrowingFunction, KV> createPrecombineMapFunction( - String pTransformId, PTransform pTransform) throws IOException { -CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); -CombineFn combineFn = -(CombineFn) -SerializableUtils.deserializeFromByteArray( - combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + groupingTable = + PrecombineGroupingTable.combiningAndSampling( + options, combineFn, keyCoder, accumCoder, 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, (Object outputElem) -> output.accept((WindowedValue>) outputElem)); Review comment: if you use a cast, you should be able to pass this in as a method reference instead of using a lambda This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118311) Time Spent: 4.5h (was: 4h 20m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 4.5h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118310 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303366 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -51,17 +71,120 @@ } } - static - ThrowingFunction, KV> createPrecombineMapFunction( - String pTransformId, PTransform pTransform) throws IOException { -CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); -CombineFn combineFn = -(CombineFn) -SerializableUtils.deserializeFromByteArray( - combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + groupingTable = + PrecombineGroupingTable.combiningAndSampling( + options, combineFn, keyCoder, accumCoder, 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, (Object outputElem) -> output.accept((WindowedValue>) outputElem)); +} + +void finishBundle() throws Exception { + groupingTable.flush( + (Object outputElem) -> output.accept((WindowedValue>) outputElem)); +} + } + + /** A factory for {@link PrecombineRunner}s. */ + @VisibleForTesting + public static class PrecombineFactory + implements PTransformRunnerFactory> { + +@Override +public PrecombineRunner createRunnerForPTransform( +PipelineOptions pipelineOptions, +BeamFnDataClient beamFnDataClient, +BeamFnStateClient beamFnStateClient, +String pTransformId, +PTransform pTransform, +Supplier processBundleInstructionId, +Map pCollections, +Map coders, +Map windowingStrategies, +Multimap>> pCollectionIdsToConsumers, +Consumer addStartFunction, +Consumer addFinishFunction, +BundleSplitListener splitListener) +throws IOException { + // Get objects needed to create the runner. + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder() + .putAllCoders(coders) + .putAllWindowingStrategies(windowingStrategies) + .build()); + String mainInputTag = Iterables.getOnlyElement(pTransform.getInputsMap().keySet()); + RunnerApi.PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); + + // Input coder may sometimes be WindowedValueCoder depending on runner, instead of the + // expected KvCoder. + Coder uncastInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId()); + KvCoder inputCoder; Review comment: You don't use the `inputCoder` anywhere except to get the key coder. Consider dropping the local variable `inputCoder` and setting `keyCoder` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118310) Time Spent: 4h 20m (was: 4h 10m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major >
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118321 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199305044 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java ## @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import org.apache.beam.runners.core.GlobalCombineFnRunner; +import org.apache.beam.runners.core.GlobalCombineFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** Static utility methods that provide {@link GroupingTable} implementations. */ +public class PrecombineGroupingTable +implements GroupingTable { + /** Returns a {@link GroupingTable} that combines inputs into a accumulator. */ + public static GroupingTable, InputT, AccumT> combining( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +new CoderSizeEstimator<>(accumulatorCoder)); + } + + /** + * Returns a {@link GroupingTable} that combines inputs into a accumulator with sampling {@link + * SizeEstimator SizeEstimators}. + */ + public static + GroupingTable, InputT, AccumT> combiningAndSampling( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder, + double sizeEstimatorSampleRate) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +sizeEstimatorSampleRate, +1.0), +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(accumulatorCoder), sizeEstimatorSampleRate, 1.0)); + } + + /** Provides client-specific operations for grouping keys. */ + public interface GroupingKeyCreator { +Object createGroupingKey(K key) throws Exception; + } + + /** Implements Precombine GroupingKeyCreator via Coder. */ + public static class WindowingCoderGroupingKeyCreator + implements GroupingKeyCreator> { + +private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE; + +private final Coder coder; + +
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118323 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#issuecomment-401887478 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118323) Time Spent: 5h 40m (was: 5.5h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 5h 40m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118319=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118319 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199576986 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java ## @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import org.apache.beam.runners.core.GlobalCombineFnRunner; +import org.apache.beam.runners.core.GlobalCombineFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** Static utility methods that provide {@link GroupingTable} implementations. */ +public class PrecombineGroupingTable +implements GroupingTable { + /** Returns a {@link GroupingTable} that combines inputs into a accumulator. */ + public static GroupingTable, InputT, AccumT> combining( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +new CoderSizeEstimator<>(accumulatorCoder)); + } + + /** + * Returns a {@link GroupingTable} that combines inputs into a accumulator with sampling {@link + * SizeEstimator SizeEstimators}. + */ + public static + GroupingTable, InputT, AccumT> combiningAndSampling( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder, + double sizeEstimatorSampleRate) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +sizeEstimatorSampleRate, +1.0), +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(accumulatorCoder), sizeEstimatorSampleRate, 1.0)); + } + + /** Provides client-specific operations for grouping keys. */ + public interface GroupingKeyCreator { +Object createGroupingKey(K key) throws Exception; + } + + /** Implements Precombine GroupingKeyCreator via Coder. */ + public static class WindowingCoderGroupingKeyCreator + implements GroupingKeyCreator> { + +private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE; + +private final Coder coder; + +
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118313=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118313 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303580 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -51,17 +71,120 @@ } } - static - ThrowingFunction, KV> createPrecombineMapFunction( - String pTransformId, PTransform pTransform) throws IOException { -CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); -CombineFn combineFn = -(CombineFn) -SerializableUtils.deserializeFromByteArray( - combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + groupingTable = + PrecombineGroupingTable.combiningAndSampling( + options, combineFn, keyCoder, accumCoder, 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, (Object outputElem) -> output.accept((WindowedValue>) outputElem)); +} + +void finishBundle() throws Exception { + groupingTable.flush( + (Object outputElem) -> output.accept((WindowedValue>) outputElem)); Review comment: ditto here, if you use a cast, you should be able to pass this in as a method reference instead of using a lambda This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118313) Time Spent: 4h 50m (was: 4h 40m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118314 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199303721 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +/** An interface that groups inputs to an accumulator and flushes the output. */ +public interface GroupingTable { + + /** Abstract interface of things that accept inputs one at a time via process(). */ + interface Receiver { +/** Processes the element. */ +void process(Object outputElem) throws Exception; + } + + /** Adds a pair to this table, possibly flushing some entries to output if the table is full. */ Review comment: `pair` -> `keyed value` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118314) Time Spent: 5h (was: 4h 50m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 5h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118320 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 02/Jul/18 18:06 Start Date: 02/Jul/18 18:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199304718 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java ## @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import org.apache.beam.runners.core.GlobalCombineFnRunner; +import org.apache.beam.runners.core.GlobalCombineFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** Static utility methods that provide {@link GroupingTable} implementations. */ +public class PrecombineGroupingTable +implements GroupingTable { + /** Returns a {@link GroupingTable} that combines inputs into a accumulator. */ + public static GroupingTable, InputT, AccumT> combining( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +new CoderSizeEstimator<>(accumulatorCoder)); + } + + /** + * Returns a {@link GroupingTable} that combines inputs into a accumulator with sampling {@link + * SizeEstimator SizeEstimators}. + */ + public static + GroupingTable, InputT, AccumT> combiningAndSampling( + PipelineOptions options, + CombineFn combineFn, + Coder keyCoder, + Coder accumulatorCoder, + double sizeEstimatorSampleRate) { +Combiner, InputT, AccumT, ?> valueCombiner = +new ValueCombiner<>( +GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); +return new PrecombineGroupingTable<>( +DEFAULT_MAX_GROUPING_TABLE_BYTES, +new WindowingCoderGroupingKeyCreator<>(keyCoder), +WindowedPairInfo.create(), +valueCombiner, +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), +sizeEstimatorSampleRate, +1.0), +new SamplingSizeEstimator<>( +new CoderSizeEstimator<>(accumulatorCoder), sizeEstimatorSampleRate, 1.0)); + } + + /** Provides client-specific operations for grouping keys. */ + public interface GroupingKeyCreator { +Object createGroupingKey(K key) throws Exception; + } + + /** Implements Precombine GroupingKeyCreator via Coder. */ + public static class WindowingCoderGroupingKeyCreator + implements GroupingKeyCreator> { + +private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE; + +private final Coder coder; + +
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117683 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 29/Jun/18 23:59 Start Date: 29/Jun/18 23:59 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#issuecomment-401501232 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117683) Time Spent: 4h 10m (was: 4h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 4h 10m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117682=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117682 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 29/Jun/18 23:59 Start Date: 29/Jun/18 23:59 Worklog Time Spent: 10m Work Description: youngoli removed a comment on issue #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#issuecomment-401499487 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117682) Time Spent: 4h (was: 3h 50m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 4h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117671 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 29/Jun/18 23:42 Start Date: 29/Jun/18 23:42 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#issuecomment-401499487 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117671) Time Spent: 3h 50m (was: 3h 40m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 3h 50m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117669=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117669 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 29/Jun/18 23:41 Start Date: 29/Jun/18 23:41 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199302720 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -55,6 +83,127 @@ } } + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + GroupingTables.Combiner, InputT, AccumT, ?> valueCombiner = + new ValueCombiner<>( + GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); + + groupingTable = + GroupingTables.combiningAndSampling( + new WindowingCoderGroupingKeyCreator<>(keyCoder), + PairInfo.create(), + valueCombiner, + new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), + new CoderSizeEstimator<>(accumCoder), + 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + +void finishBundle() throws Exception { + groupingTable.flush( + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + } + + /** A factory for {@link PrecombineRunner}s. */ + private static class PrecombineFactory + implements PTransformRunnerFactory> { + +@Override +public PrecombineRunner createRunnerForPTransform( +PipelineOptions pipelineOptions, +BeamFnDataClient beamFnDataClient, +BeamFnStateClient beamFnStateClient, +String pTransformId, +PTransform pTransform, +Supplier processBundleInstructionId, +Map pCollections, +Map coders, +Map windowingStrategies, +Multimap>> pCollectionIdsToConsumers, +Consumer addStartFunction, +Consumer addFinishFunction, +BundleSplitListener splitListener) +throws IOException { + // Get objects needed to create the runner. + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder() + .putAllCoders(coders) + .putAllWindowingStrategies(windowingStrategies) + .build()); + String mainInputTag = Iterables.getOnlyElement(pTransform.getInputsMap().keySet()); + RunnerApi.PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); + WindowedValueCoder> inputCoder = + (WindowedValueCoder>) + rehydratedComponents.getCoder(mainInput.getCoderId()); + Coder keyCoder = ((KvCoder) inputCoder.getValueCoder()).getKeyCoder(); + + CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); + CombineFn combineFn = + (CombineFn) + SerializableUtils.deserializeFromByteArray( + combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + Coder accumCoder = + (Coder) rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId()); + + Collection>>> consumers = + (Collection) + pCollectionIdsToConsumers.get( + Iterables.getOnlyElement(pTransform.getOutputsMap().values())); + + // Create the runner. + PrecombineRunner runner = + new PrecombineRunner( + pipelineOptions, + combineFn, + MultiplexingFnDataReceiver.forConsumers(consumers), + keyCoder, + accumCoder); + + // Register the appropriate handlers. + addStartFunction.accept(runner::startBundle); Review comment: Fixed This is an
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117670=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117670 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 29/Jun/18 23:41 Start Date: 29/Jun/18 23:41 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r199302724 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -55,6 +83,127 @@ } } + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + GroupingTables.Combiner, InputT, AccumT, ?> valueCombiner = + new ValueCombiner<>( + GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); + + groupingTable = + GroupingTables.combiningAndSampling( + new WindowingCoderGroupingKeyCreator<>(keyCoder), + PairInfo.create(), + valueCombiner, + new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), + new CoderSizeEstimator<>(accumCoder), + 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + +void finishBundle() throws Exception { + groupingTable.flush( + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + } + + /** A factory for {@link PrecombineRunner}s. */ + private static class PrecombineFactory + implements PTransformRunnerFactory> { + +@Override +public PrecombineRunner createRunnerForPTransform( +PipelineOptions pipelineOptions, +BeamFnDataClient beamFnDataClient, +BeamFnStateClient beamFnStateClient, +String pTransformId, +PTransform pTransform, +Supplier processBundleInstructionId, +Map pCollections, +Map coders, +Map windowingStrategies, +Multimap>> pCollectionIdsToConsumers, +Consumer addStartFunction, +Consumer addFinishFunction, +BundleSplitListener splitListener) +throws IOException { + // Get objects needed to create the runner. + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder() + .putAllCoders(coders) + .putAllWindowingStrategies(windowingStrategies) + .build()); + String mainInputTag = Iterables.getOnlyElement(pTransform.getInputsMap().keySet()); + RunnerApi.PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); + WindowedValueCoder> inputCoder = + (WindowedValueCoder>) + rehydratedComponents.getCoder(mainInput.getCoderId()); + Coder keyCoder = ((KvCoder) inputCoder.getValueCoder()).getKeyCoder(); + + CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); + CombineFn combineFn = + (CombineFn) + SerializableUtils.deserializeFromByteArray( + combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + Coder accumCoder = + (Coder) rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId()); + + Collection>>> consumers = + (Collection) + pCollectionIdsToConsumers.get( + Iterables.getOnlyElement(pTransform.getOutputsMap().values())); + + // Create the runner. + PrecombineRunner runner = + new PrecombineRunner( + pipelineOptions, + combineFn, + MultiplexingFnDataReceiver.forConsumers(consumers), + keyCoder, + accumCoder); + + // Register the appropriate handlers. + addStartFunction.accept(runner::startBundle); + pCollectionIdsToConsumers.put( + Iterables.getOnlyElement(pTransform.getInputsMap().values()),
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116644=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116644 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 27/Jun/18 22:53 Start Date: 27/Jun/18 22:53 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r198663598 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -55,6 +83,127 @@ } } + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + GroupingTables.Combiner, InputT, AccumT, ?> valueCombiner = + new ValueCombiner<>( + GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); + + groupingTable = + GroupingTables.combiningAndSampling( + new WindowingCoderGroupingKeyCreator<>(keyCoder), + PairInfo.create(), + valueCombiner, + new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), + new CoderSizeEstimator<>(accumCoder), + 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + +void finishBundle() throws Exception { + groupingTable.flush( + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + } + + /** A factory for {@link PrecombineRunner}s. */ + private static class PrecombineFactory + implements PTransformRunnerFactory> { + +@Override +public PrecombineRunner createRunnerForPTransform( +PipelineOptions pipelineOptions, +BeamFnDataClient beamFnDataClient, +BeamFnStateClient beamFnStateClient, +String pTransformId, +PTransform pTransform, +Supplier processBundleInstructionId, +Map pCollections, +Map coders, +Map windowingStrategies, +Multimap>> pCollectionIdsToConsumers, +Consumer addStartFunction, +Consumer addFinishFunction, +BundleSplitListener splitListener) +throws IOException { + // Get objects needed to create the runner. + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder() + .putAllCoders(coders) + .putAllWindowingStrategies(windowingStrategies) + .build()); + String mainInputTag = Iterables.getOnlyElement(pTransform.getInputsMap().keySet()); + RunnerApi.PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); + WindowedValueCoder> inputCoder = + (WindowedValueCoder>) + rehydratedComponents.getCoder(mainInput.getCoderId()); + Coder keyCoder = ((KvCoder) inputCoder.getValueCoder()).getKeyCoder(); + + CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); + CombineFn combineFn = + (CombineFn) + SerializableUtils.deserializeFromByteArray( + combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + Coder accumCoder = + (Coder) rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId()); + + Collection>>> consumers = + (Collection) + pCollectionIdsToConsumers.get( + Iterables.getOnlyElement(pTransform.getOutputsMap().values())); + + // Create the runner. + PrecombineRunner runner = + new PrecombineRunner( + pipelineOptions, + combineFn, + MultiplexingFnDataReceiver.forConsumers(consumers), + keyCoder, + accumCoder); + + // Register the appropriate handlers. + addStartFunction.accept(runner::startBundle); + pCollectionIdsToConsumers.put( + Iterables.getOnlyElement(pTransform.getInputsMap().values()),
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116645=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116645 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 27/Jun/18 22:53 Start Date: 27/Jun/18 22:53 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#discussion_r198663384 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -55,6 +83,127 @@ } } + private static class PrecombineRunner { +private PipelineOptions options; +private CombineFn combineFn; +private FnDataReceiver>> output; +private Coder keyCoder; +private GroupingTable, InputT, AccumT> groupingTable; +private Coder accumCoder; + +PrecombineRunner( +PipelineOptions options, +CombineFn combineFn, +FnDataReceiver>> output, +Coder keyCoder, +Coder accumCoder) { + this.options = options; + this.combineFn = combineFn; + this.output = output; + this.keyCoder = keyCoder; + this.accumCoder = accumCoder; +} + +void startBundle() { + GroupingTables.Combiner, InputT, AccumT, ?> valueCombiner = + new ValueCombiner<>( + GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options); + + groupingTable = + GroupingTables.combiningAndSampling( + new WindowingCoderGroupingKeyCreator<>(keyCoder), + PairInfo.create(), + valueCombiner, + new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)), + new CoderSizeEstimator<>(accumCoder), + 0.001 /*sizeEstimatorSampleRate*/); +} + +void processElement(WindowedValue> elem) throws Exception { + groupingTable.put( + elem, + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + +void finishBundle() throws Exception { + groupingTable.flush( + (Object outputElem) -> + output.accept((WindowedValue>) outputElem) + ); +} + } + + /** A factory for {@link PrecombineRunner}s. */ + private static class PrecombineFactory + implements PTransformRunnerFactory> { + +@Override +public PrecombineRunner createRunnerForPTransform( +PipelineOptions pipelineOptions, +BeamFnDataClient beamFnDataClient, +BeamFnStateClient beamFnStateClient, +String pTransformId, +PTransform pTransform, +Supplier processBundleInstructionId, +Map pCollections, +Map coders, +Map windowingStrategies, +Multimap>> pCollectionIdsToConsumers, +Consumer addStartFunction, +Consumer addFinishFunction, +BundleSplitListener splitListener) +throws IOException { + // Get objects needed to create the runner. + RehydratedComponents rehydratedComponents = + RehydratedComponents.forComponents( + RunnerApi.Components.newBuilder() + .putAllCoders(coders) + .putAllWindowingStrategies(windowingStrategies) + .build()); + String mainInputTag = Iterables.getOnlyElement(pTransform.getInputsMap().keySet()); + RunnerApi.PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); + WindowedValueCoder> inputCoder = + (WindowedValueCoder>) + rehydratedComponents.getCoder(mainInput.getCoderId()); + Coder keyCoder = ((KvCoder) inputCoder.getValueCoder()).getKeyCoder(); + + CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); + CombineFn combineFn = + (CombineFn) + SerializableUtils.deserializeFromByteArray( + combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + Coder accumCoder = + (Coder) rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId()); + + Collection>>> consumers = + (Collection) + pCollectionIdsToConsumers.get( + Iterables.getOnlyElement(pTransform.getOutputsMap().values())); + + // Create the runner. + PrecombineRunner runner = + new PrecombineRunner( + pipelineOptions, + combineFn, + MultiplexingFnDataReceiver.forConsumers(consumers), + keyCoder, + accumCoder); + + // Register the appropriate handlers. + addStartFunction.accept(runner::startBundle); Review comment: You add the start function twice.
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116621 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 27/Jun/18 21:31 Start Date: 27/Jun/18 21:31 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795#issuecomment-400835731 R: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 116621) Time Spent: 3h (was: 2h 50m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 3h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116619=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116619 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 27/Jun/18 21:29 Start Date: 27/Jun/18 21:29 Worklog Time Spent: 10m Work Description: youngoli opened a new pull request #5795: [BEAM-3708] Adding grouping table to Precombine step. URL: https://github.com/apache/beam/pull/5795 Adding a grouping table to the Precombine step of a lifted Combine Per Key. This enables performing a Partial Group by Key optimization. The grouping table code is somewhat generic, so it can be reused in other runners that want to perform a Partial Group by Key. Note for any reviewers: I wasn't entirely sure where to commit the GroupingTable code, since it's somewhat generic, so I'm starting with the most specific directory it would fit in, but I may move the GroupingTable files to a new sub-directory named "utils" or something similar, or a completely different directory if anyone has any suggestions. Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Spark --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 116619) Time Spent: 2h 50m (was: 2h 40m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL:
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116054=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116054 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 26/Jun/18 17:31 Start Date: 26/Jun/18 17:31 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java new file mode 100644 index 000..849ebf41d56 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms; +import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.sdk.fn.function.ThrowingFunction; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; + +/** + * Executes different components of Combine PTransforms. + */ +public class CombineRunners { + + /** + * A registrar which provides a factory to handle combine component PTransforms. + */ + @AutoService(PTransformRunnerFactory.Registrar.class) + public static class Registrar implements PTransformRunnerFactory.Registrar { + +@Override +public Map getPTransformRunnerFactories() { + return ImmutableMap.of( + BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE), + MapFnRunners.forValueMapFnFactory(CombineRunners::createPrecombineMapFunction), + BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS), + MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction), + BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS), + MapFnRunners.forValueMapFnFactory(CombineRunners::createExtractOutputsMapFunction), + BeamUrns.getUrn(StandardPTransforms.Composites.COMBINE_GROUPED_VALUES), + MapFnRunners.forValueMapFnFactory(CombineRunners::createCombineGroupedValuesMapFunction)); +} + } + + static + ThrowingFunction, KV> createPrecombineMapFunction( + String pTransformId, PTransform pTransform) throws IOException { +CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); +CombineFn combineFn = +(CombineFn) +SerializableUtils.deserializeFromByteArray( + combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + +return (KV input) -> +KV.of(input.getKey(), combineFn.addInput(combineFn.createAccumulator(), input.getValue())); + } + + static + ThrowingFunction>, KV> + createMergeAccumulatorsMapFunction(String pTransformId, PTransform pTransform) + throws IOException { +CombinePayload combinePayload = CombinePayload.parseFrom(pTransform.getSpec().getPayload()); +CombineFn combineFn = +(CombineFn) +SerializableUtils.deserializeFromByteArray( + combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); + +return (KV> input) -> +KV.of(input.getKey(),
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116053=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116053 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 26/Jun/18 17:30 Start Date: 26/Jun/18 17:30 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r198233212 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java ## @@ -157,16 +155,15 @@ public void testFlattenWithDuplicateInputCollectionProducesMultipleOutputs() thr assertThat(consumers.keySet(), containsInAnyOrder("inputATarget", "mainOutputTarget")); assertThat(consumers.get("inputATarget"), hasSize(2)); -Iterator>> targets = consumers.get("inputATarget").iterator(); -FnDataReceiver> first = targets.next(); -FnDataReceiver> second = targets.next(); +FnDataReceiver> input = +MultiplexingFnDataReceiver.forConsumers(consumers.get("inputATarget")); // Both of these are the flatten consumer -assertThat(first, equalTo(second)); +//assertThat(first, equalTo(second)); Review comment: minor note: I think you meant to delete these 3 commented lines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 116053) Time Spent: 2.5h (was: 2h 20m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 2.5h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116042=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116042 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 26/Jun/18 17:19 Start Date: 26/Jun/18 17:19 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-400395190 I think this is good to be merged in. The failed check down there is an experimental precommit phase that this PR seems to have accidentally been run against while someone was testing it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 116042) Time Spent: 2h 20m (was: 2h 10m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 2h 20m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116009 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 26/Jun/18 16:04 Start Date: 26/Jun/18 16:04 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-400367762 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 116009) Time Spent: 2h 10m (was: 2h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 2h 10m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114959=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114959 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 22/Jun/18 22:42 Start Date: 22/Jun/18 22:42 Worklog Time Spent: 10m Work Description: youngoli removed a comment on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-399592699 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114959) Time Spent: 2h (was: 1h 50m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 2h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114957 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 22/Jun/18 22:41 Start Date: 22/Jun/18 22:41 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-399602351 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114957) Time Spent: 1h 40m (was: 1.5h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 1h 40m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114958=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114958 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 22/Jun/18 22:41 Start Date: 22/Jun/18 22:41 Worklog Time Spent: 10m Work Description: youngoli removed a comment on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-399591064 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114958) Time Spent: 1h 50m (was: 1h 40m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114945 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 22/Jun/18 21:53 Start Date: 22/Jun/18 21:53 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-399592699 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114945) Time Spent: 1.5h (was: 1h 20m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 1.5h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114935 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 22/Jun/18 21:45 Start Date: 22/Jun/18 21:45 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-399591064 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114935) Time Spent: 1h 20m (was: 1h 10m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113974 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 20/Jun/18 23:24 Start Date: 20/Jun/18 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r196970538 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.number.IsCloseTo.closeTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.function.ThrowingRunnable; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + + +/** + * Tests for {@link CombineRunners}. + */ +@RunWith(JUnit4.class) +public class CombineRunnersTest { + private static class TestCombineFn extends CombineFn { + +@Override +public Double createAccumulator() { + return 0.0; +} + +@Override +public Double addInput(Double accum, Integer input) { + accum += input; + return accum; +} + +@Override +public Double mergeAccumulators(Iterable accums) { + Double merged = 0.0; + for (Double accum : accums) { +merged += accum; + } + return merged; +} + +@Override +public String extractOutput(Double accum) { + return accum.toString(); +} + } + + private static final String TEST_COMBINE_ID = "combineId"; + + private RunnerApi.PTransform pTransform; + private String inputPCollectionId; + private String outputPCollectionId; + + @Before + public void createPipeline() throws Exception { +// Create pipeline with an input pCollection, combine, and output pCollection. +TestCombineFn combineFn = new TestCombineFn(); +Combine.PerKey combine = Combine.perKey(combineFn); + +Pipeline p = Pipeline.create(); +PCollection> inputPCollection = p.apply(Create.of(KV.of("unused", 0))); +inputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); +PCollection> outputPCollection = inputPCollection +.apply(TEST_COMBINE_ID, combine); +outputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + +// Create FnApi protos needed for the runner. +SdkComponents sdkComponents = SdkComponents.create(); +RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); +inputPCollectionId =
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113972=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113972 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 20/Jun/18 23:24 Start Date: 20/Jun/18 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r196969501 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.number.IsCloseTo.closeTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.function.ThrowingRunnable; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + Review comment: nit: remove extra white space. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113972) Time Spent: 1h (was: 50m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 1h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113973 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 20/Jun/18 23:24 Start Date: 20/Jun/18 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r196970955 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.number.IsCloseTo.closeTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.function.ThrowingRunnable; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + + +/** + * Tests for {@link CombineRunners}. + */ +@RunWith(JUnit4.class) +public class CombineRunnersTest { + private static class TestCombineFn extends CombineFn { + +@Override +public Double createAccumulator() { + return 0.0; +} + +@Override +public Double addInput(Double accum, Integer input) { + accum += input; + return accum; +} + +@Override +public Double mergeAccumulators(Iterable accums) { + Double merged = 0.0; + for (Double accum : accums) { +merged += accum; + } + return merged; +} + +@Override +public String extractOutput(Double accum) { + return accum.toString(); +} + } + + private static final String TEST_COMBINE_ID = "combineId"; + + private RunnerApi.PTransform pTransform; + private String inputPCollectionId; + private String outputPCollectionId; + + @Before + public void createPipeline() throws Exception { +// Create pipeline with an input pCollection, combine, and output pCollection. +TestCombineFn combineFn = new TestCombineFn(); +Combine.PerKey combine = Combine.perKey(combineFn); + +Pipeline p = Pipeline.create(); +PCollection> inputPCollection = p.apply(Create.of(KV.of("unused", 0))); +inputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); +PCollection> outputPCollection = inputPCollection +.apply(TEST_COMBINE_ID, combine); +outputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + +// Create FnApi protos needed for the runner. +SdkComponents sdkComponents = SdkComponents.create(); +RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); +inputPCollectionId =
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113971=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113971 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 20/Jun/18 23:24 Start Date: 20/Jun/18 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r196971687 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java ## @@ -109,7 +109,7 @@ public void testCreatingAndProcessingDoFlatten() throws Exception { } /** - * Create a Flatten that has 4 inputs (inputATarget1, inputATarget2, inputBTarget, inputCTarget) + * Create a Flatten that has 4 inputs (inputATarget1, inputATarget1, inputATarget2, inputATarget2) Review comment: This flatten only has two inputs (`inputA`, `inputAAgain`) and not 4. Listing them isn't important to the test so consider saying this test consumes data from the same PCollection. Also, on line 158, consider using MultiplexingFnDataReceiver.forConsumers to have a single target instead of invoking each target separately. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113971) Time Spent: 50m (was: 40m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 50m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113969=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113969 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 20/Jun/18 23:24 Start Date: 20/Jun/18 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r196966468 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms; +import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.sdk.fn.function.ThrowingFunction; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; + +/** + * Executes different components of Combine PTransforms. + */ +public class CombineRunners { + + private static final String PRECOMBINE_URN = + BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE); + private static final String MERGE_ACCUMULATORS_URN = + BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS); + private static final String EXTRACT_OUTPUTS_URN = + BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS); + private static final String COMBINE_GROUPED_VALUES_URN = + BeamUrns.getUrn(StandardPTransforms.Composites.COMBINE_GROUPED_VALUES); + + /** + * A registrar which provides a factory to handle pre-combine PTransforms. Review comment: nit: comment says this is only about pre-combine but in reality its about the combine composite being expanded. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113969) Time Spent: 40m (was: 0.5h) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 40m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113970=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113970 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 20/Jun/18 23:24 Start Date: 20/Jun/18 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#discussion_r196966761 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms; +import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.sdk.fn.function.ThrowingFunction; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; + +/** + * Executes different components of Combine PTransforms. + */ +public class CombineRunners { + + private static final String PRECOMBINE_URN = Review comment: Not much value in declaring these constants here again since they are used in this class only once so directly reference them there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113970) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 40m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=111320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111320 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 12/Jun/18 23:09 Start Date: 12/Jun/18 23:09 Worklog Time Spent: 10m Work Description: youngoli opened a new pull request #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622 Adding implementations for the components represented by the Combine URNs described in the portable combines doc: https://s.apache.org/beam-runner-api-combine-model This is just an initial implementation so it's lacking one major feature which is an optimization in the Precombine. Planning to add that right away, but want to get this in first. Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111320) Time Spent: 10m Remaining Estimate: 0h > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 10m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=111323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111323 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 12/Jun/18 23:09 Start Date: 12/Jun/18 23:09 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-396762786 R: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111323) Time Spent: 0.5h (was: 20m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 0.5h > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK
[ https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=111322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111322 ] ASF GitHub Bot logged work on BEAM-3708: Author: ASF GitHub Bot Created on: 12/Jun/18 23:09 Start Date: 12/Jun/18 23:09 Worklog Time Spent: 10m Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding Combine component implementations to Java SDK URL: https://github.com/apache/beam/pull/5622#issuecomment-396762786 @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111322) Time Spent: 20m (was: 10m) > Implement the portable lifted Combiner transforms in Java SDK > - > > Key: BEAM-3708 > URL: https://issues.apache.org/jira/browse/BEAM-3708 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core, sdk-java-harness >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Labels: portability > Time Spent: 20m > Remaining Estimate: 0h > > Lifted combines are split into separate parts with different URNs. These > parts need to be implemented in the Java SDK harness so that the SDK can > actually execute them when receiving Combine transforms with the > corresponding URNs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)