[jira] [Commented] (BEAM-96) Support composing combine functions

2016-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15200406#comment-15200406
 ] 

ASF GitHub Bot commented on BEAM-96:


Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/23


> Support composing combine functions
> ---
>
> Key: BEAM-96
> URL: https://issues.apache.org/jira/browse/BEAM-96
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Pei He
>Assignee: Pei He
>
> The proposal of composed combine functions is following:
> pc.apply(
> Combine.perKey(
>  CombineFns.composeKeyed()
> .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> .with(identityFn, new MeanFn(), meanLatencyTag)));
> Example code:
>* PCollection> latencies = ...;
>*
>* TupleTag maxLatencyTag = new TupleTag();
>* TupleTag meanLatencyTag = new TupleTag();
>*
>* SimpleFunction identityFn =
>* new SimpleFunction() {
>*   @Override
>*   public Integer apply(Integer input) {
>*   return input;
>*   }};
>* PCollection> maxAndMean = latencies.apply(
>* Combine.perKey(
>* CombineFns.composeKeyed()
>*.with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>*.with(identityFn, new MeanFn(), meanLatencyTag)));
>*
>* PCollection finalResultCollection = maxAndMean
>* .apply(ParDo.of(
>* new DoFn, T>() {
>*   @Override
>*   public void processElement(ProcessContext c) throws Exception {
>* KV e = c.element();
>* Integer maxLatency = e.getValue().get(maxLatencyTag);
>* Double meanLatency = e.getValue().get(meanLatencyTag);
>*  Do Something 
>* c.output(...some T...);
>*   }
>* }));



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-96) Support composing combine functions

2016-03-19 Thread Davor Bonaci (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203110#comment-15203110
 ] 

Davor Bonaci commented on BEAM-96:
--

Pei, is this done?

> Support composing combine functions
> ---
>
> Key: BEAM-96
> URL: https://issues.apache.org/jira/browse/BEAM-96
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Pei He
>Assignee: Pei He
>
> The proposal of composed combine functions is following:
> pc.apply(
> Combine.perKey(
>  CombineFns.composeKeyed()
> .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> .with(identityFn, new MeanFn(), meanLatencyTag)));
> Example code:
>* PCollection> latencies = ...;
>*
>* TupleTag maxLatencyTag = new TupleTag();
>* TupleTag meanLatencyTag = new TupleTag();
>*
>* SimpleFunction identityFn =
>* new SimpleFunction() {
>*   @Override
>*   public Integer apply(Integer input) {
>*   return input;
>*   }};
>* PCollection> maxAndMean = latencies.apply(
>* Combine.perKey(
>* CombineFns.composeKeyed()
>*.with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>*.with(identityFn, new MeanFn(), meanLatencyTag)));
>*
>* PCollection finalResultCollection = maxAndMean
>* .apply(ParDo.of(
>* new DoFn, T>() {
>*   @Override
>*   public void processElement(ProcessContext c) throws Exception {
>* KV e = c.element();
>* Integer maxLatency = e.getValue().get(maxLatencyTag);
>* Double meanLatency = e.getValue().get(meanLatencyTag);
>*  Do Something 
>* c.output(...some T...);
>*   }
>* }));



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-96) Support composing combine functions

2016-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15180626#comment-15180626
 ] 

ASF GitHub Bot commented on BEAM-96:


GitHub user peihe opened a pull request:

https://github.com/apache/incubator-beam/pull/23

[BEAM-96] Add composed combine functions builders in CombineFns

* compose() or composeKeyed() are used to start composition
* with() is used to add an input-transformation, a combine fn and an output 
TupleTag
* A non-CombineFn builder is used to ensure that every composition includes 
at least one item
* Duplicate output tags are not allowed in the same composition

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/peihe/incubator-beam composed-combine

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/23.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23


commit 5ceefe0a78add3742f01ec9fdb6d17cf59192cf3
Author: Pei He 
Date:   2016-03-04T21:54:34Z

[BEAM-96] Add composed combine functions builders in CombineFns

* compose() or composeKeyed() are used to start composition
* with() is used to add an input-transformation, a combine fn and an
* output TupleTag
* A non-CombineFn builder is used to ensure that every composition
* includes at least one item
* Duplicate output tags are not allowed in the same composition




> Support composing combine functions
> ---
>
> Key: BEAM-96
> URL: https://issues.apache.org/jira/browse/BEAM-96
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Pei He
>Assignee: Davor Bonaci
>
> The proposal of composed combine functions is following:
> pc.apply(
> Combine.perKey(
>  CombineFns.composeKeyed()
> .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> .with(identityFn, new MeanFn(), meanLatencyTag)));
> Example code:
>* PCollection> latencies = ...;
>*
>* TupleTag maxLatencyTag = new TupleTag();
>* TupleTag meanLatencyTag = new TupleTag();
>*
>* SimpleFunction identityFn =
>* new SimpleFunction() {
>*   @Override
>*   public Integer apply(Integer input) {
>*   return input;
>*   }};
>* PCollection> maxAndMean = latencies.apply(
>* Combine.perKey(
>* CombineFns.composeKeyed()
>*.with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>*.with(identityFn, new MeanFn(), meanLatencyTag)));
>*
>* PCollection finalResultCollection = maxAndMean
>* .apply(ParDo.of(
>* new DoFn, T>() {
>*   @Override
>*   public void processElement(ProcessContext c) throws Exception {
>* KV e = c.element();
>* Integer maxLatency = e.getValue().get(maxLatencyTag);
>* Double meanLatency = e.getValue().get(meanLatencyTag);
>*  Do Something 
>* c.output(...some T...);
>*   }
>* }));



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)