[jira] [Commented] (BEAM-96) Support composing combine functions
[ https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15200406#comment-15200406 ] ASF GitHub Bot commented on BEAM-96: Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/23 > Support composing combine functions > --- > > Key: BEAM-96 > URL: https://issues.apache.org/jira/browse/BEAM-96 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Pei He >Assignee: Pei He > > The proposal of composed combine functions is following: > pc.apply( > Combine.perKey( > CombineFns.composeKeyed() > .with(identityFn, new MaxIntegerFn(), maxLatencyTag) > .with(identityFn, new MeanFn(), meanLatencyTag))); > Example code: >* PCollection> latencies = ...; >* >* TupleTag maxLatencyTag = new TupleTag(); >* TupleTag meanLatencyTag = new TupleTag(); >* >* SimpleFunction identityFn = >* new SimpleFunction () { >* @Override >* public Integer apply(Integer input) { >* return input; >* }}; >* PCollection > maxAndMean = latencies.apply( >* Combine.perKey( >* CombineFns.composeKeyed() >*.with(identityFn, new MaxIntegerFn(), maxLatencyTag) >*.with(identityFn, new MeanFn(), meanLatencyTag))); >* >* PCollection finalResultCollection = maxAndMean >* .apply(ParDo.of( >* new DoFn , T>() { >* @Override >* public void processElement(ProcessContext c) throws Exception { >* KV e = c.element(); >* Integer maxLatency = e.getValue().get(maxLatencyTag); >* Double meanLatency = e.getValue().get(meanLatencyTag); >* Do Something >* c.output(...some T...); >* } >* })); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-96) Support composing combine functions
[ https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203110#comment-15203110 ] Davor Bonaci commented on BEAM-96: -- Pei, is this done? > Support composing combine functions > --- > > Key: BEAM-96 > URL: https://issues.apache.org/jira/browse/BEAM-96 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Pei He >Assignee: Pei He > > The proposal of composed combine functions is following: > pc.apply( > Combine.perKey( > CombineFns.composeKeyed() > .with(identityFn, new MaxIntegerFn(), maxLatencyTag) > .with(identityFn, new MeanFn(), meanLatencyTag))); > Example code: >* PCollection> latencies = ...; >* >* TupleTag maxLatencyTag = new TupleTag(); >* TupleTag meanLatencyTag = new TupleTag(); >* >* SimpleFunction identityFn = >* new SimpleFunction () { >* @Override >* public Integer apply(Integer input) { >* return input; >* }}; >* PCollection > maxAndMean = latencies.apply( >* Combine.perKey( >* CombineFns.composeKeyed() >*.with(identityFn, new MaxIntegerFn(), maxLatencyTag) >*.with(identityFn, new MeanFn(), meanLatencyTag))); >* >* PCollection finalResultCollection = maxAndMean >* .apply(ParDo.of( >* new DoFn , T>() { >* @Override >* public void processElement(ProcessContext c) throws Exception { >* KV e = c.element(); >* Integer maxLatency = e.getValue().get(maxLatencyTag); >* Double meanLatency = e.getValue().get(meanLatencyTag); >* Do Something >* c.output(...some T...); >* } >* })); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-96) Support composing combine functions
[ https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15180626#comment-15180626 ] ASF GitHub Bot commented on BEAM-96: GitHub user peihe opened a pull request: https://github.com/apache/incubator-beam/pull/23 [BEAM-96] Add composed combine functions builders in CombineFns * compose() or composeKeyed() are used to start composition * with() is used to add an input-transformation, a combine fn and an output TupleTag * A non-CombineFn builder is used to ensure that every composition includes at least one item * Duplicate output tags are not allowed in the same composition You can merge this pull request into a Git repository by running: $ git pull https://github.com/peihe/incubator-beam composed-combine Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/23.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23 commit 5ceefe0a78add3742f01ec9fdb6d17cf59192cf3 Author: Pei HeDate: 2016-03-04T21:54:34Z [BEAM-96] Add composed combine functions builders in CombineFns * compose() or composeKeyed() are used to start composition * with() is used to add an input-transformation, a combine fn and an * output TupleTag * A non-CombineFn builder is used to ensure that every composition * includes at least one item * Duplicate output tags are not allowed in the same composition > Support composing combine functions > --- > > Key: BEAM-96 > URL: https://issues.apache.org/jira/browse/BEAM-96 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Pei He >Assignee: Davor Bonaci > > The proposal of composed combine functions is following: > pc.apply( > Combine.perKey( > CombineFns.composeKeyed() > .with(identityFn, new MaxIntegerFn(), maxLatencyTag) > .with(identityFn, new MeanFn(), meanLatencyTag))); > Example code: >* PCollection > latencies = ...; >* >* TupleTag maxLatencyTag = new TupleTag(); >* TupleTag meanLatencyTag = new TupleTag(); >* >* SimpleFunction identityFn = >* new SimpleFunction () { >* @Override >* public Integer apply(Integer input) { >* return input; >* }}; >* PCollection > maxAndMean = latencies.apply( >* Combine.perKey( >* CombineFns.composeKeyed() >*.with(identityFn, new MaxIntegerFn(), maxLatencyTag) >*.with(identityFn, new MeanFn(), meanLatencyTag))); >* >* PCollection finalResultCollection = maxAndMean >* .apply(ParDo.of( >* new DoFn , T>() { >* @Override >* public void processElement(ProcessContext c) throws Exception { >* KV e = c.element(); >* Integer maxLatency = e.getValue().get(maxLatencyTag); >* Double meanLatency = e.getValue().get(meanLatencyTag); >* Do Something >* c.output(...some T...); >* } >* })); -- This message was sent by Atlassian JIRA (v6.3.4#6332)