[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118339
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:33
Start Date: 02/Jul/18 18:33
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #5795: [BEAM-3708] Adding 
grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
index 35399b92e4c..178c4c1c0d8 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -18,20 +18,40 @@
 package org.apache.beam.fn.harness;
 
 import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.control.BundleSplitListener;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
 import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 
 /** Executes different components of Combine PTransforms. */
-public class CombineRunners {
+public class CombineRunners {
 
   /** A registrar which provides a factory to handle combine component 
PTransforms. */
   @AutoService(PTransformRunnerFactory.Registrar.class)
@@ -41,7 +61,7 @@
 public Map getPTransformRunnerFactories() 
{
   return ImmutableMap.of(
   
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE),
-  
MapFnRunners.forValueMapFnFactory(CombineRunners::createPrecombineMapFunction),
+  new PrecombineFactory(),
   
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS),
   
MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction),
   
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS),
@@ -51,17 +71,120 @@
 }
   }
 
-  static 
-  ThrowingFunction, KV> 
createPrecombineMapFunction(
-  String pTransformId, PTransform pTransform) throws IOException {
-CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
-CombineFn combineFn =
-(CombineFn)
-SerializableUtils.deserializeFromByteArray(
-
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118312=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118312
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303656
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+/** An interface that groups inputs to an accumulator and flushes the output. 
*/
+public interface GroupingTable {
+
+  /** Abstract interface of things that accept inputs one at a time via 
process(). */
+  interface Receiver {
 
 Review comment:
   I don't think we'll need to make this generic in this sense. Consider using 
`FnDataReceiver` directly here instead of `Receiver`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118312)
Time Spent: 4h 40m  (was: 4.5h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118317=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118317
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199305122
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
 ##
 @@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations. 
*/
+public class PrecombineGroupingTable
+implements GroupingTable {
+  /** Returns a {@link GroupingTable} that combines inputs into a accumulator. 
*/
+  public static  GroupingTable, InputT, 
AccumT> combining(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+new CoderSizeEstimator<>(accumulatorCoder));
+  }
+
+  /**
+   * Returns a {@link GroupingTable} that combines inputs into a accumulator 
with sampling {@link
+   * SizeEstimator SizeEstimators}.
+   */
+  public static 
+  GroupingTable, InputT, AccumT> combiningAndSampling(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder,
+  double sizeEstimatorSampleRate) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new SamplingSizeEstimator<>(
+new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+sizeEstimatorSampleRate,
+1.0),
+new SamplingSizeEstimator<>(
+new CoderSizeEstimator<>(accumulatorCoder), 
sizeEstimatorSampleRate, 1.0));
+  }
+
+  /** Provides client-specific operations for grouping keys. */
+  public interface GroupingKeyCreator {
+Object createGroupingKey(K key) throws Exception;
+  }
+
+  /** Implements Precombine GroupingKeyCreator via Coder. */
+  public static class WindowingCoderGroupingKeyCreator
+  implements GroupingKeyCreator> {
+
+private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+private final Coder coder;
+
+

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118315
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303702
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+/** An interface that groups inputs to an accumulator and flushes the output. 
*/
+public interface GroupingTable {
+
+  /** Abstract interface of things that accept inputs one at a time via 
process(). */
+  interface Receiver {
+/** Processes the element. */
+void process(Object outputElem) throws Exception;
+  }
+
+  /** Adds a pair to this table, possibly flushing some entries to output if 
the table is full. */
+  void put(Object pair, Receiver receiver) throws Exception;
 
 Review comment:
   You can use `KV`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118315)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118318=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118318
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199577433
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
 ##
 @@ -93,6 +93,7 @@ public Integer extractOutput(Integer accum) {
   private RunnerApi.PTransform pTransform;
   private String inputPCollectionId;
   private String outputPCollectionId;
+  private RunnerApi.Pipeline pProto;
 
 Review comment:
   nit: `pProto` -> `pipeline` or `pipelineProto`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118318)
Time Spent: 5h 20m  (was: 5h 10m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118316=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118316
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303794
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
 ##
 @@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations. 
*/
+public class PrecombineGroupingTable
 
 Review comment:
   Consider calling this class `GroupingTables`.
   
   I don't know how much re-use we will get on this class, you can make it an 
internal detail of `CombineRunners` instead of exposing it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118316)
Time Spent: 5h 10m  (was: 5h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118311=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118311
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303571
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -51,17 +71,120 @@
 }
   }
 
-  static 
-  ThrowingFunction, KV> 
createPrecombineMapFunction(
-  String pTransformId, PTransform pTransform) throws IOException {
-CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
-CombineFn combineFn =
-(CombineFn)
-SerializableUtils.deserializeFromByteArray(
-
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  groupingTable =
+  PrecombineGroupingTable.combiningAndSampling(
+  options, combineFn, keyCoder, accumCoder, 0.001 
/*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem, (Object outputElem) -> output.accept((WindowedValue>) outputElem));
 
 Review comment:
   if you use a cast, you should be able to pass this in as a method reference 
instead of using a lambda


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118311)
Time Spent: 4.5h  (was: 4h 20m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118310
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303366
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -51,17 +71,120 @@
 }
   }
 
-  static 
-  ThrowingFunction, KV> 
createPrecombineMapFunction(
-  String pTransformId, PTransform pTransform) throws IOException {
-CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
-CombineFn combineFn =
-(CombineFn)
-SerializableUtils.deserializeFromByteArray(
-
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  groupingTable =
+  PrecombineGroupingTable.combiningAndSampling(
+  options, combineFn, keyCoder, accumCoder, 0.001 
/*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem, (Object outputElem) -> output.accept((WindowedValue>) outputElem));
+}
+
+void finishBundle() throws Exception {
+  groupingTable.flush(
+  (Object outputElem) -> output.accept((WindowedValue>) outputElem));
+}
+  }
+
+  /** A factory for {@link PrecombineRunner}s. */
+  @VisibleForTesting
+  public static class PrecombineFactory
+  implements PTransformRunnerFactory> {
+
+@Override
+public PrecombineRunner createRunnerForPTransform(
+PipelineOptions pipelineOptions,
+BeamFnDataClient beamFnDataClient,
+BeamFnStateClient beamFnStateClient,
+String pTransformId,
+PTransform pTransform,
+Supplier processBundleInstructionId,
+Map pCollections,
+Map coders,
+Map windowingStrategies,
+Multimap>> 
pCollectionIdsToConsumers,
+Consumer addStartFunction,
+Consumer addFinishFunction,
+BundleSplitListener splitListener)
+throws IOException {
+  // Get objects needed to create the runner.
+  RehydratedComponents rehydratedComponents =
+  RehydratedComponents.forComponents(
+  RunnerApi.Components.newBuilder()
+  .putAllCoders(coders)
+  .putAllWindowingStrategies(windowingStrategies)
+  .build());
+  String mainInputTag = 
Iterables.getOnlyElement(pTransform.getInputsMap().keySet());
+  RunnerApi.PCollection mainInput = 
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
+
+  // Input coder may sometimes be WindowedValueCoder depending on runner, 
instead of the
+  // expected KvCoder.
+  Coder uncastInputCoder = 
rehydratedComponents.getCoder(mainInput.getCoderId());
+  KvCoder inputCoder;
 
 Review comment:
   You don't use the `inputCoder` anywhere except to get the key coder.
   
   Consider dropping the local variable `inputCoder` and setting `keyCoder` 
directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118310)
Time Spent: 4h 20m  (was: 4h 10m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118321
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199305044
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
 ##
 @@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations. 
*/
+public class PrecombineGroupingTable
+implements GroupingTable {
+  /** Returns a {@link GroupingTable} that combines inputs into a accumulator. 
*/
+  public static  GroupingTable, InputT, 
AccumT> combining(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+new CoderSizeEstimator<>(accumulatorCoder));
+  }
+
+  /**
+   * Returns a {@link GroupingTable} that combines inputs into a accumulator 
with sampling {@link
+   * SizeEstimator SizeEstimators}.
+   */
+  public static 
+  GroupingTable, InputT, AccumT> combiningAndSampling(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder,
+  double sizeEstimatorSampleRate) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new SamplingSizeEstimator<>(
+new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+sizeEstimatorSampleRate,
+1.0),
+new SamplingSizeEstimator<>(
+new CoderSizeEstimator<>(accumulatorCoder), 
sizeEstimatorSampleRate, 1.0));
+  }
+
+  /** Provides client-specific operations for grouping keys. */
+  public interface GroupingKeyCreator {
+Object createGroupingKey(K key) throws Exception;
+  }
+
+  /** Implements Precombine GroupingKeyCreator via Coder. */
+  public static class WindowingCoderGroupingKeyCreator
+  implements GroupingKeyCreator> {
+
+private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+private final Coder coder;
+
+

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118323
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #5795: [BEAM-3708] Adding 
grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#issuecomment-401887478
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118323)
Time Spent: 5h 40m  (was: 5.5h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118319=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118319
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199576986
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
 ##
 @@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations. 
*/
+public class PrecombineGroupingTable
+implements GroupingTable {
+  /** Returns a {@link GroupingTable} that combines inputs into a accumulator. 
*/
+  public static  GroupingTable, InputT, 
AccumT> combining(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+new CoderSizeEstimator<>(accumulatorCoder));
+  }
+
+  /**
+   * Returns a {@link GroupingTable} that combines inputs into a accumulator 
with sampling {@link
+   * SizeEstimator SizeEstimators}.
+   */
+  public static 
+  GroupingTable, InputT, AccumT> combiningAndSampling(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder,
+  double sizeEstimatorSampleRate) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new SamplingSizeEstimator<>(
+new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+sizeEstimatorSampleRate,
+1.0),
+new SamplingSizeEstimator<>(
+new CoderSizeEstimator<>(accumulatorCoder), 
sizeEstimatorSampleRate, 1.0));
+  }
+
+  /** Provides client-specific operations for grouping keys. */
+  public interface GroupingKeyCreator {
+Object createGroupingKey(K key) throws Exception;
+  }
+
+  /** Implements Precombine GroupingKeyCreator via Coder. */
+  public static class WindowingCoderGroupingKeyCreator
+  implements GroupingKeyCreator> {
+
+private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+private final Coder coder;
+
+

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118313=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118313
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303580
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -51,17 +71,120 @@
 }
   }
 
-  static 
-  ThrowingFunction, KV> 
createPrecombineMapFunction(
-  String pTransformId, PTransform pTransform) throws IOException {
-CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
-CombineFn combineFn =
-(CombineFn)
-SerializableUtils.deserializeFromByteArray(
-
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  groupingTable =
+  PrecombineGroupingTable.combiningAndSampling(
+  options, combineFn, keyCoder, accumCoder, 0.001 
/*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem, (Object outputElem) -> output.accept((WindowedValue>) outputElem));
+}
+
+void finishBundle() throws Exception {
+  groupingTable.flush(
+  (Object outputElem) -> output.accept((WindowedValue>) outputElem));
 
 Review comment:
   ditto here, if you use a cast, you should be able to pass this in as a 
method reference instead of using a lambda


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118313)
Time Spent: 4h 50m  (was: 4h 40m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118314
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199303721
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/GroupingTable.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+/** An interface that groups inputs to an accumulator and flushes the output. 
*/
+public interface GroupingTable {
+
+  /** Abstract interface of things that accept inputs one at a time via 
process(). */
+  interface Receiver {
+/** Processes the element. */
+void process(Object outputElem) throws Exception;
+  }
+
+  /** Adds a pair to this table, possibly flushing some entries to output if 
the table is full. */
 
 Review comment:
   `pair` -> `keyed value`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118314)
Time Spent: 5h  (was: 4h 50m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=118320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118320
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 02/Jul/18 18:06
Start Date: 02/Jul/18 18:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199304718
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PrecombineGroupingTable.java
 ##
 @@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.runners.core.GlobalCombineFnRunner;
+import org.apache.beam.runners.core.GlobalCombineFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/** Static utility methods that provide {@link GroupingTable} implementations. 
*/
+public class PrecombineGroupingTable
+implements GroupingTable {
+  /** Returns a {@link GroupingTable} that combines inputs into a accumulator. 
*/
+  public static  GroupingTable, InputT, 
AccumT> combining(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+new CoderSizeEstimator<>(accumulatorCoder));
+  }
+
+  /**
+   * Returns a {@link GroupingTable} that combines inputs into a accumulator 
with sampling {@link
+   * SizeEstimator SizeEstimators}.
+   */
+  public static 
+  GroupingTable, InputT, AccumT> combiningAndSampling(
+  PipelineOptions options,
+  CombineFn combineFn,
+  Coder keyCoder,
+  Coder accumulatorCoder,
+  double sizeEstimatorSampleRate) {
+Combiner, InputT, AccumT, ?> valueCombiner =
+new ValueCombiner<>(
+GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+return new PrecombineGroupingTable<>(
+DEFAULT_MAX_GROUPING_TABLE_BYTES,
+new WindowingCoderGroupingKeyCreator<>(keyCoder),
+WindowedPairInfo.create(),
+valueCombiner,
+new SamplingSizeEstimator<>(
+new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+sizeEstimatorSampleRate,
+1.0),
+new SamplingSizeEstimator<>(
+new CoderSizeEstimator<>(accumulatorCoder), 
sizeEstimatorSampleRate, 1.0));
+  }
+
+  /** Provides client-specific operations for grouping keys. */
+  public interface GroupingKeyCreator {
+Object createGroupingKey(K key) throws Exception;
+  }
+
+  /** Implements Precombine GroupingKeyCreator via Coder. */
+  public static class WindowingCoderGroupingKeyCreator
+  implements GroupingKeyCreator> {
+
+private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+private final Coder coder;
+
+

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117683
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 29/Jun/18 23:59
Start Date: 29/Jun/18 23:59
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5795: [BEAM-3708] Adding 
grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#issuecomment-401501232
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117683)
Time Spent: 4h 10m  (was: 4h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117682=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117682
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 29/Jun/18 23:59
Start Date: 29/Jun/18 23:59
Worklog Time Spent: 10m 
  Work Description: youngoli removed a comment on issue #5795: [BEAM-3708] 
Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#issuecomment-401499487
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117682)
Time Spent: 4h  (was: 3h 50m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117671
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 29/Jun/18 23:42
Start Date: 29/Jun/18 23:42
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5795: [BEAM-3708] Adding 
grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#issuecomment-401499487
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117671)
Time Spent: 3h 50m  (was: 3h 40m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117669=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117669
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 29/Jun/18 23:41
Start Date: 29/Jun/18 23:41
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199302720
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -55,6 +83,127 @@
 }
   }
 
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  GroupingTables.Combiner, InputT, AccumT, ?> 
valueCombiner =
+  new ValueCombiner<>(
+  GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+
+  groupingTable =
+  GroupingTables.combiningAndSampling(
+  new WindowingCoderGroupingKeyCreator<>(keyCoder),
+  PairInfo.create(),
+  valueCombiner,
+  new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+  new CoderSizeEstimator<>(accumCoder),
+  0.001 /*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem,
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+
+void finishBundle() throws Exception {
+  groupingTable.flush(
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+  }
+
+  /** A factory for {@link PrecombineRunner}s. */
+  private static class PrecombineFactory
+  implements PTransformRunnerFactory> {
+
+@Override
+public PrecombineRunner createRunnerForPTransform(
+PipelineOptions pipelineOptions,
+BeamFnDataClient beamFnDataClient,
+BeamFnStateClient beamFnStateClient,
+String pTransformId,
+PTransform pTransform,
+Supplier processBundleInstructionId,
+Map pCollections,
+Map coders,
+Map windowingStrategies,
+Multimap>> 
pCollectionIdsToConsumers,
+Consumer addStartFunction,
+Consumer addFinishFunction,
+BundleSplitListener splitListener)
+throws IOException {
+  // Get objects needed to create the runner.
+  RehydratedComponents rehydratedComponents =
+  RehydratedComponents.forComponents(
+  RunnerApi.Components.newBuilder()
+  .putAllCoders(coders)
+  .putAllWindowingStrategies(windowingStrategies)
+  .build());
+  String mainInputTag = 
Iterables.getOnlyElement(pTransform.getInputsMap().keySet());
+  RunnerApi.PCollection mainInput = 
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
+  WindowedValueCoder> inputCoder =
+  (WindowedValueCoder>)
+  rehydratedComponents.getCoder(mainInput.getCoderId());
+  Coder keyCoder = ((KvCoder) 
inputCoder.getValueCoder()).getKeyCoder();
+
+  CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+  CombineFn combineFn =
+  (CombineFn)
+  SerializableUtils.deserializeFromByteArray(
+  
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  Coder accumCoder =
+  (Coder) 
rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());
+
+  Collection>>> consumers =
+  (Collection)
+  pCollectionIdsToConsumers.get(
+  
Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
+
+  // Create the runner.
+  PrecombineRunner runner =
+  new PrecombineRunner(
+  pipelineOptions,
+  combineFn,
+  MultiplexingFnDataReceiver.forConsumers(consumers),
+  keyCoder,
+  accumCoder);
+
+  // Register the appropriate handlers.
+  addStartFunction.accept(runner::startBundle);
 
 Review comment:
   Fixed


This is an 

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=117670=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117670
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 29/Jun/18 23:41
Start Date: 29/Jun/18 23:41
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r199302724
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -55,6 +83,127 @@
 }
   }
 
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  GroupingTables.Combiner, InputT, AccumT, ?> 
valueCombiner =
+  new ValueCombiner<>(
+  GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+
+  groupingTable =
+  GroupingTables.combiningAndSampling(
+  new WindowingCoderGroupingKeyCreator<>(keyCoder),
+  PairInfo.create(),
+  valueCombiner,
+  new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+  new CoderSizeEstimator<>(accumCoder),
+  0.001 /*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem,
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+
+void finishBundle() throws Exception {
+  groupingTable.flush(
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+  }
+
+  /** A factory for {@link PrecombineRunner}s. */
+  private static class PrecombineFactory
+  implements PTransformRunnerFactory> {
+
+@Override
+public PrecombineRunner createRunnerForPTransform(
+PipelineOptions pipelineOptions,
+BeamFnDataClient beamFnDataClient,
+BeamFnStateClient beamFnStateClient,
+String pTransformId,
+PTransform pTransform,
+Supplier processBundleInstructionId,
+Map pCollections,
+Map coders,
+Map windowingStrategies,
+Multimap>> 
pCollectionIdsToConsumers,
+Consumer addStartFunction,
+Consumer addFinishFunction,
+BundleSplitListener splitListener)
+throws IOException {
+  // Get objects needed to create the runner.
+  RehydratedComponents rehydratedComponents =
+  RehydratedComponents.forComponents(
+  RunnerApi.Components.newBuilder()
+  .putAllCoders(coders)
+  .putAllWindowingStrategies(windowingStrategies)
+  .build());
+  String mainInputTag = 
Iterables.getOnlyElement(pTransform.getInputsMap().keySet());
+  RunnerApi.PCollection mainInput = 
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
+  WindowedValueCoder> inputCoder =
+  (WindowedValueCoder>)
+  rehydratedComponents.getCoder(mainInput.getCoderId());
+  Coder keyCoder = ((KvCoder) 
inputCoder.getValueCoder()).getKeyCoder();
+
+  CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+  CombineFn combineFn =
+  (CombineFn)
+  SerializableUtils.deserializeFromByteArray(
+  
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  Coder accumCoder =
+  (Coder) 
rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());
+
+  Collection>>> consumers =
+  (Collection)
+  pCollectionIdsToConsumers.get(
+  
Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
+
+  // Create the runner.
+  PrecombineRunner runner =
+  new PrecombineRunner(
+  pipelineOptions,
+  combineFn,
+  MultiplexingFnDataReceiver.forConsumers(consumers),
+  keyCoder,
+  accumCoder);
+
+  // Register the appropriate handlers.
+  addStartFunction.accept(runner::startBundle);
+  pCollectionIdsToConsumers.put(
+  Iterables.getOnlyElement(pTransform.getInputsMap().values()),

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116644=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116644
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 27/Jun/18 22:53
Start Date: 27/Jun/18 22:53
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r198663598
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -55,6 +83,127 @@
 }
   }
 
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  GroupingTables.Combiner, InputT, AccumT, ?> 
valueCombiner =
+  new ValueCombiner<>(
+  GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+
+  groupingTable =
+  GroupingTables.combiningAndSampling(
+  new WindowingCoderGroupingKeyCreator<>(keyCoder),
+  PairInfo.create(),
+  valueCombiner,
+  new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+  new CoderSizeEstimator<>(accumCoder),
+  0.001 /*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem,
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+
+void finishBundle() throws Exception {
+  groupingTable.flush(
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+  }
+
+  /** A factory for {@link PrecombineRunner}s. */
+  private static class PrecombineFactory
+  implements PTransformRunnerFactory> {
+
+@Override
+public PrecombineRunner createRunnerForPTransform(
+PipelineOptions pipelineOptions,
+BeamFnDataClient beamFnDataClient,
+BeamFnStateClient beamFnStateClient,
+String pTransformId,
+PTransform pTransform,
+Supplier processBundleInstructionId,
+Map pCollections,
+Map coders,
+Map windowingStrategies,
+Multimap>> 
pCollectionIdsToConsumers,
+Consumer addStartFunction,
+Consumer addFinishFunction,
+BundleSplitListener splitListener)
+throws IOException {
+  // Get objects needed to create the runner.
+  RehydratedComponents rehydratedComponents =
+  RehydratedComponents.forComponents(
+  RunnerApi.Components.newBuilder()
+  .putAllCoders(coders)
+  .putAllWindowingStrategies(windowingStrategies)
+  .build());
+  String mainInputTag = 
Iterables.getOnlyElement(pTransform.getInputsMap().keySet());
+  RunnerApi.PCollection mainInput = 
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
+  WindowedValueCoder> inputCoder =
+  (WindowedValueCoder>)
+  rehydratedComponents.getCoder(mainInput.getCoderId());
+  Coder keyCoder = ((KvCoder) 
inputCoder.getValueCoder()).getKeyCoder();
+
+  CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+  CombineFn combineFn =
+  (CombineFn)
+  SerializableUtils.deserializeFromByteArray(
+  
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  Coder accumCoder =
+  (Coder) 
rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());
+
+  Collection>>> consumers =
+  (Collection)
+  pCollectionIdsToConsumers.get(
+  
Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
+
+  // Create the runner.
+  PrecombineRunner runner =
+  new PrecombineRunner(
+  pipelineOptions,
+  combineFn,
+  MultiplexingFnDataReceiver.forConsumers(consumers),
+  keyCoder,
+  accumCoder);
+
+  // Register the appropriate handlers.
+  addStartFunction.accept(runner::startBundle);
+  pCollectionIdsToConsumers.put(
+  Iterables.getOnlyElement(pTransform.getInputsMap().values()),

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116645=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116645
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 27/Jun/18 22:53
Start Date: 27/Jun/18 22:53
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5795: 
[BEAM-3708] Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#discussion_r198663384
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -55,6 +83,127 @@
 }
   }
 
+  private static class PrecombineRunner {
+private PipelineOptions options;
+private CombineFn combineFn;
+private FnDataReceiver>> output;
+private Coder keyCoder;
+private GroupingTable, InputT, AccumT> groupingTable;
+private Coder accumCoder;
+
+PrecombineRunner(
+PipelineOptions options,
+CombineFn combineFn,
+FnDataReceiver>> output,
+Coder keyCoder,
+Coder accumCoder) {
+  this.options = options;
+  this.combineFn = combineFn;
+  this.output = output;
+  this.keyCoder = keyCoder;
+  this.accumCoder = accumCoder;
+}
+
+void startBundle() {
+  GroupingTables.Combiner, InputT, AccumT, ?> 
valueCombiner =
+  new ValueCombiner<>(
+  GlobalCombineFnRunners.create(combineFn), 
NullSideInputReader.empty(), options);
+
+  groupingTable =
+  GroupingTables.combiningAndSampling(
+  new WindowingCoderGroupingKeyCreator<>(keyCoder),
+  PairInfo.create(),
+  valueCombiner,
+  new 
CoderSizeEstimator<>(WindowedValue.getValueOnlyCoder(keyCoder)),
+  new CoderSizeEstimator<>(accumCoder),
+  0.001 /*sizeEstimatorSampleRate*/);
+}
+
+void processElement(WindowedValue> elem) throws Exception 
{
+  groupingTable.put(
+  elem,
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+
+void finishBundle() throws Exception {
+  groupingTable.flush(
+  (Object outputElem) ->
+  output.accept((WindowedValue>) outputElem)
+  );
+}
+  }
+
+  /** A factory for {@link PrecombineRunner}s. */
+  private static class PrecombineFactory
+  implements PTransformRunnerFactory> {
+
+@Override
+public PrecombineRunner createRunnerForPTransform(
+PipelineOptions pipelineOptions,
+BeamFnDataClient beamFnDataClient,
+BeamFnStateClient beamFnStateClient,
+String pTransformId,
+PTransform pTransform,
+Supplier processBundleInstructionId,
+Map pCollections,
+Map coders,
+Map windowingStrategies,
+Multimap>> 
pCollectionIdsToConsumers,
+Consumer addStartFunction,
+Consumer addFinishFunction,
+BundleSplitListener splitListener)
+throws IOException {
+  // Get objects needed to create the runner.
+  RehydratedComponents rehydratedComponents =
+  RehydratedComponents.forComponents(
+  RunnerApi.Components.newBuilder()
+  .putAllCoders(coders)
+  .putAllWindowingStrategies(windowingStrategies)
+  .build());
+  String mainInputTag = 
Iterables.getOnlyElement(pTransform.getInputsMap().keySet());
+  RunnerApi.PCollection mainInput = 
pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
+  WindowedValueCoder> inputCoder =
+  (WindowedValueCoder>)
+  rehydratedComponents.getCoder(mainInput.getCoderId());
+  Coder keyCoder = ((KvCoder) 
inputCoder.getValueCoder()).getKeyCoder();
+
+  CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+  CombineFn combineFn =
+  (CombineFn)
+  SerializableUtils.deserializeFromByteArray(
+  
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+  Coder accumCoder =
+  (Coder) 
rehydratedComponents.getCoder(combinePayload.getAccumulatorCoderId());
+
+  Collection>>> consumers =
+  (Collection)
+  pCollectionIdsToConsumers.get(
+  
Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
+
+  // Create the runner.
+  PrecombineRunner runner =
+  new PrecombineRunner(
+  pipelineOptions,
+  combineFn,
+  MultiplexingFnDataReceiver.forConsumers(consumers),
+  keyCoder,
+  accumCoder);
+
+  // Register the appropriate handlers.
+  addStartFunction.accept(runner::startBundle);
 
 Review comment:
   You add the start function twice.


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116621
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 27/Jun/18 21:31
Start Date: 27/Jun/18 21:31
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5795: [BEAM-3708] Adding 
grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795#issuecomment-400835731
 
 
   R: @lukecwik


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116621)
Time Spent: 3h  (was: 2h 50m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116619=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116619
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 27/Jun/18 21:29
Start Date: 27/Jun/18 21:29
Worklog Time Spent: 10m 
  Work Description: youngoli opened a new pull request #5795: [BEAM-3708] 
Adding grouping table to Precombine step.
URL: https://github.com/apache/beam/pull/5795
 
 
   Adding a grouping table to the Precombine step of a lifted Combine Per
   Key. This enables performing a Partial Group by Key optimization. The
   grouping table code is somewhat generic, so it can be reused in other
   runners that want to perform a Partial Group by Key.
   
   Note for any reviewers:
   I wasn't entirely sure where to commit the GroupingTable code, since it's 
somewhat generic, so I'm starting with the most specific directory it would fit 
in, but I may move the GroupingTable files to a new sub-directory named "utils" 
or something similar, or a completely different directory if anyone has any 
suggestions.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Spark
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116619)
Time Spent: 2h 50m  (was: 2h 40m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: 

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116054=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116054
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 26/Jun/18 17:31
Start Date: 26/Jun/18 17:31
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
new file mode 100644
index 000..849ebf41d56
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Executes different components of Combine PTransforms.
+ */
+public class CombineRunners {
+
+  /**
+   * A registrar which provides a factory to handle combine component 
PTransforms.
+   */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements PTransformRunnerFactory.Registrar {
+
+@Override
+public Map getPTransformRunnerFactories() 
{
+  return ImmutableMap.of(
+  
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE),
+  
MapFnRunners.forValueMapFnFactory(CombineRunners::createPrecombineMapFunction),
+  
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS),
+  
MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction),
+  
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS),
+  
MapFnRunners.forValueMapFnFactory(CombineRunners::createExtractOutputsMapFunction),
+  
BeamUrns.getUrn(StandardPTransforms.Composites.COMBINE_GROUPED_VALUES),
+  
MapFnRunners.forValueMapFnFactory(CombineRunners::createCombineGroupedValuesMapFunction));
+}
+  }
+
+  static 
+  ThrowingFunction, KV> 
createPrecombineMapFunction(
+  String pTransformId, PTransform pTransform) throws IOException {
+CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+CombineFn combineFn =
+(CombineFn)
+SerializableUtils.deserializeFromByteArray(
+
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+
+return (KV input) ->
+KV.of(input.getKey(), 
combineFn.addInput(combineFn.createAccumulator(), input.getValue()));
+  }
+
+  static 
+  ThrowingFunction>, KV>
+  createMergeAccumulatorsMapFunction(String pTransformId, PTransform 
pTransform)
+  throws IOException {
+CombinePayload combinePayload = 
CombinePayload.parseFrom(pTransform.getSpec().getPayload());
+CombineFn combineFn =
+(CombineFn)
+SerializableUtils.deserializeFromByteArray(
+
combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), 
"CombineFn");
+
+return (KV> input) ->
+KV.of(input.getKey(), 

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116053=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116053
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 26/Jun/18 17:30
Start Date: 26/Jun/18 17:30
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r198233212
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
 ##
 @@ -157,16 +155,15 @@ public void 
testFlattenWithDuplicateInputCollectionProducesMultipleOutputs() thr
 assertThat(consumers.keySet(), containsInAnyOrder("inputATarget", 
"mainOutputTarget"));
 
 assertThat(consumers.get("inputATarget"), hasSize(2));
-Iterator>> targets = 
consumers.get("inputATarget").iterator();
-FnDataReceiver> first = targets.next();
-FnDataReceiver> second = targets.next();
+FnDataReceiver> input =
+MultiplexingFnDataReceiver.forConsumers(consumers.get("inputATarget"));
 // Both of these are the flatten consumer
-assertThat(first, equalTo(second));
+//assertThat(first, equalTo(second));
 
 Review comment:
   minor note: I think you meant to delete these 3 commented lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116053)
Time Spent: 2.5h  (was: 2h 20m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116042=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116042
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 26/Jun/18 17:19
Start Date: 26/Jun/18 17:19
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-400395190
 
 
   I think this is good to be merged in. The failed check down there is an 
experimental precommit phase that this PR seems to have accidentally been run 
against while someone was testing it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116042)
Time Spent: 2h 20m  (was: 2h 10m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=116009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116009
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 26/Jun/18 16:04
Start Date: 26/Jun/18 16:04
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-400367762
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116009)
Time Spent: 2h 10m  (was: 2h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114959=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114959
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 22/Jun/18 22:42
Start Date: 22/Jun/18 22:42
Worklog Time Spent: 10m 
  Work Description: youngoli removed a comment on issue #5622: [BEAM-3708] 
Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-399592699
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114959)
Time Spent: 2h  (was: 1h 50m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114957
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 22/Jun/18 22:41
Start Date: 22/Jun/18 22:41
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-399602351
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114957)
Time Spent: 1h 40m  (was: 1.5h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114958=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114958
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 22/Jun/18 22:41
Start Date: 22/Jun/18 22:41
Worklog Time Spent: 10m 
  Work Description: youngoli removed a comment on issue #5622: [BEAM-3708] 
Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-399591064
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114958)
Time Spent: 1h 50m  (was: 1h 40m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114945
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 22/Jun/18 21:53
Start Date: 22/Jun/18 21:53
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-399592699
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114945)
Time Spent: 1.5h  (was: 1h 20m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=114935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114935
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 22/Jun/18 21:45
Start Date: 22/Jun/18 21:45
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-399591064
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114935)
Time Spent: 1h 20m  (was: 1h 10m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113974
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 20/Jun/18 23:24
Start Date: 20/Jun/18 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r196970538
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.number.IsCloseTo.closeTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+
+/**
+ * Tests for {@link CombineRunners}.
+ */
+@RunWith(JUnit4.class)
+public class CombineRunnersTest {
+  private static class TestCombineFn extends CombineFn {
+
+@Override
+public Double createAccumulator() {
+  return 0.0;
+}
+
+@Override
+public Double addInput(Double accum, Integer input) {
+  accum += input;
+  return accum;
+}
+
+@Override
+public Double mergeAccumulators(Iterable accums) {
+  Double merged = 0.0;
+  for (Double accum : accums) {
+merged += accum;
+  }
+  return merged;
+}
+
+@Override
+public String extractOutput(Double accum) {
+  return accum.toString();
+}
+  }
+
+  private static final String TEST_COMBINE_ID = "combineId";
+
+  private RunnerApi.PTransform pTransform;
+  private String inputPCollectionId;
+  private String outputPCollectionId;
+
+  @Before
+  public void createPipeline() throws Exception {
+// Create pipeline with an input pCollection, combine, and output 
pCollection.
+TestCombineFn combineFn = new TestCombineFn();
+Combine.PerKey combine = 
Combine.perKey(combineFn);
+
+Pipeline p = Pipeline.create();
+PCollection> inputPCollection = 
p.apply(Create.of(KV.of("unused", 0)));
+inputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of()));
+PCollection> outputPCollection = inputPCollection
+.apply(TEST_COMBINE_ID, combine);
+outputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()));
+
+// Create FnApi protos needed for the runner.
+SdkComponents sdkComponents = SdkComponents.create();
+RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+inputPCollectionId = 

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113972=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113972
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 20/Jun/18 23:24
Start Date: 20/Jun/18 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r196969501
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.number.IsCloseTo.closeTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
 
 Review comment:
   nit: remove extra white space.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113972)
Time Spent: 1h  (was: 50m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113973
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 20/Jun/18 23:24
Start Date: 20/Jun/18 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r196970955
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.number.IsCloseTo.closeTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingRunnable;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+
+/**
+ * Tests for {@link CombineRunners}.
+ */
+@RunWith(JUnit4.class)
+public class CombineRunnersTest {
+  private static class TestCombineFn extends CombineFn {
+
+@Override
+public Double createAccumulator() {
+  return 0.0;
+}
+
+@Override
+public Double addInput(Double accum, Integer input) {
+  accum += input;
+  return accum;
+}
+
+@Override
+public Double mergeAccumulators(Iterable accums) {
+  Double merged = 0.0;
+  for (Double accum : accums) {
+merged += accum;
+  }
+  return merged;
+}
+
+@Override
+public String extractOutput(Double accum) {
+  return accum.toString();
+}
+  }
+
+  private static final String TEST_COMBINE_ID = "combineId";
+
+  private RunnerApi.PTransform pTransform;
+  private String inputPCollectionId;
+  private String outputPCollectionId;
+
+  @Before
+  public void createPipeline() throws Exception {
+// Create pipeline with an input pCollection, combine, and output 
pCollection.
+TestCombineFn combineFn = new TestCombineFn();
+Combine.PerKey combine = 
Combine.perKey(combineFn);
+
+Pipeline p = Pipeline.create();
+PCollection> inputPCollection = 
p.apply(Create.of(KV.of("unused", 0)));
+inputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of()));
+PCollection> outputPCollection = inputPCollection
+.apply(TEST_COMBINE_ID, combine);
+outputPCollection.setCoder(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()));
+
+// Create FnApi protos needed for the runner.
+SdkComponents sdkComponents = SdkComponents.create();
+RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
+inputPCollectionId = 

[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113971=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113971
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 20/Jun/18 23:24
Start Date: 20/Jun/18 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r196971687
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
 ##
 @@ -109,7 +109,7 @@ public void testCreatingAndProcessingDoFlatten() throws 
Exception {
   }
 
   /**
-   * Create a Flatten that has 4 inputs (inputATarget1, inputATarget2, 
inputBTarget, inputCTarget)
+   * Create a Flatten that has 4 inputs (inputATarget1, inputATarget1, 
inputATarget2, inputATarget2)
 
 Review comment:
   This flatten only has two inputs (`inputA`, `inputAAgain`) and not 4. 
Listing them isn't important to the test so consider saying this test consumes 
data from the same PCollection.
   
   Also, on line 158, consider using MultiplexingFnDataReceiver.forConsumers to 
have a single target instead of invoking each target separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113971)
Time Spent: 50m  (was: 40m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113969=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113969
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 20/Jun/18 23:24
Start Date: 20/Jun/18 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r196966468
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Executes different components of Combine PTransforms.
+ */
+public class CombineRunners {
+
+  private static final String PRECOMBINE_URN =
+  
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_PRECOMBINE);
+  private static final String MERGE_ACCUMULATORS_URN =
+  
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS);
+  private static final String EXTRACT_OUTPUTS_URN =
+  
BeamUrns.getUrn(StandardPTransforms.CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS);
+  private static final String COMBINE_GROUPED_VALUES_URN =
+  BeamUrns.getUrn(StandardPTransforms.Composites.COMBINE_GROUPED_VALUES);
+
+  /**
+   * A registrar which provides a factory to handle pre-combine PTransforms.
 
 Review comment:
   nit: comment says this is only about pre-combine but in reality its about 
the combine composite being expanded.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113969)
Time Spent: 40m  (was: 0.5h)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=113970=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113970
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 20/Jun/18 23:24
Start Date: 20/Jun/18 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5622: 
[BEAM-3708] Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#discussion_r196966761
 
 

 ##
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Executes different components of Combine PTransforms.
+ */
+public class CombineRunners {
+
+  private static final String PRECOMBINE_URN =
 
 Review comment:
   Not much value in declaring these constants here again since they are used 
in this class only once so directly reference them there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113970)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=111320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111320
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 12/Jun/18 23:09
Start Date: 12/Jun/18 23:09
Worklog Time Spent: 10m 
  Work Description: youngoli opened a new pull request #5622: [BEAM-3708] 
Adding Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622
 
 
   Adding implementations for the components represented by the Combine
   URNs described in the portable combines doc:
   https://s.apache.org/beam-runner-api-combine-model
   
   This is just an initial implementation so it's lacking one major feature
   which is an optimization in the Precombine. Planning to add that
   right away, but want to get this in first.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111320)
Time Spent: 10m
Remaining Estimate: 0h

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=111323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111323
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 12/Jun/18 23:09
Start Date: 12/Jun/18 23:09
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-396762786
 
 
   R: @lukecwik


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111323)
Time Spent: 0.5h  (was: 20m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3708) Implement the portable lifted Combiner transforms in Java SDK

2018-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3708?focusedWorklogId=111322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111322
 ]

ASF GitHub Bot logged work on BEAM-3708:


Author: ASF GitHub Bot
Created on: 12/Jun/18 23:09
Start Date: 12/Jun/18 23:09
Worklog Time Spent: 10m 
  Work Description: youngoli commented on issue #5622: [BEAM-3708] Adding 
Combine component implementations to Java SDK
URL: https://github.com/apache/beam/pull/5622#issuecomment-396762786
 
 
   @lukecwik


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111322)
Time Spent: 20m  (was: 10m)

> Implement the portable lifted Combiner transforms in Java SDK
> -
>
> Key: BEAM-3708
> URL: https://issues.apache.org/jira/browse/BEAM-3708
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core, sdk-java-harness
>Reporter: Daniel Oliveira
>Assignee: Daniel Oliveira
>Priority: Major
>  Labels: portability
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Lifted combines are split into separate parts with different URNs. These 
> parts need to be implemented in the Java SDK harness so that the SDK can 
> actually execute them when receiving Combine transforms with the 
> corresponding URNs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)