[ 
https://issues.apache.org/jira/browse/BEAM-1102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15727742#comment-15727742
 ] 

Aljoscha Krettek commented on BEAM-1102:
----------------------------------------

The problem is this part in {{FlinkProcessContextBase}}:

{code}
  @Override
  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
  createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
        new SerializableFnAggregatorWrapper<>(combiner);
    Accumulator<?, ?> existingAccum =
        (Accumulator<AggInputT, Serializable>) 
runtimeContext.getAccumulator(name);
    if (existingAccum != null) {
      return wrapper;
    } else {
      runtimeContext.addAccumulator(name, wrapper);
    }
    return wrapper;
  }
{code}

Notice how the newly created wrapper is returned if the accumulator already 
exists.

> Flink Batch Runner does not populate aggregator values
> ------------------------------------------------------
>
>                 Key: BEAM-1102
>                 URL: https://issues.apache.org/jira/browse/BEAM-1102
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 0.3.0-incubating
>            Reporter: Daniel Halperin
>            Assignee: Aljoscha Krettek
>            Priority: Minor
>
> Running the quickstart gives 0 for emptyLines.
> Running with {{--streaming=true}} gives the correct value (for my input file, 
> the default examples archetype {{pom.xml}}, the true value is 27 at the time 
> of writing).
> Streaming output:
> {code}
> Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Final aggregator values:
> Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run
> INFO: DroppedDueToLateness : 0
> Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run
> INFO: emptyLines : 27
> Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run
> INFO: DroppedDueToClosedWindow : 0
> {code}
> Non-streaming output:
> {code}
> Dec 07, 2016 12:58:07 PM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Final aggregator values:
> Dec 07, 2016 12:58:07 PM org.apache.beam.runners.flink.FlinkRunner run
> INFO: emptyLines : 0
> {code}
> (Note also that the lateness etc. aggregators are missing entirely, may be 
> expected).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to