I’ll try to go into a bit more detail about the windows here. What you can do 
is this:

DataStream<Tuple3<String, Double, Long>> input = … // fields are (id, sum, 
count), where count is initialized to 1, similar to word count

DataStream<Tuple3<String, Double, Long>> counts = input
  .reduce(new MyCountingReducer())

DataStream<Tuple3<String, Double, Long>> result = counts.map( <mapper that 
divides sum by count> )

Does this help? Here, you don’t even have to deal with state, the windowing 
system will keep the state (i.e. the reduced) value in internal state in a 
fault tolerant fashion.

> On 26 Nov 2015, at 17:14, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> In streaming, there is no "end" of the stream when you would emit the final 
> sum. That's why there are windows.
> If you do not want the partial sums, but only the final sum, you need to 
> define what window in which the sum is computed. At the end of that window, 
> that value is emitted. The window can be based on time, counts, or other 
> measures.
> Greetings,
> Stephan
> On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier <javier.lo...@zalando.de> 
> wrote:
> Hi, thanks for the answer. It worked but not in the way we expected. We 
> expect to have only one sum per ID and we are getting all the consecutive 
> sums, for example:
> We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial 
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for our 
> test:
> DataStream<T
> uple2<String, Double>> stream = ...;
> DataStream<Tuple4<String, Double, Long, Double>> result = 
> stream.keyBy(0).map(new RollingSum());
> public static class RollingSum extends RichMapFunction<Tuple2<String, 
> Double>, Tuple4<String, Double, Long, Double>> {
>         // persistent counter
>       private OperatorState<Double> sum;
>       private OperatorState<Long> count;
>         @Override
>         public Tuple4<String, Double, Long, Double> map(Tuple2<String, 
> Double> value1) {
>               try {
>                       Double newSum = sum.value()+value1.f1;
>                               sum.update(newSum);
>                               count.update(count.value()+1);
>                               return new Tuple4<String, Double, Long, 
> Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>                       } catch (IOException e) {
>                               // TODO Auto-generated catch block
>                               e.printStackTrace();
>                       }
>               return null;
>         }
>         @Override
>         public void open(Configuration config) {
>             sum = getRuntimeContext().getKeyValueState("mySum", Double.class, 
> 0D);
>             count = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
>         }
>     }
> We are using a Tuple4 because we want to calculate the sum and the average 
> (So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to get 
> a single value out of it? or is this the expected behavior.
> Thanks again for your help.
> On 25 November 2015 at 17:19, Stephan Ewen <se...@apache.org> wrote:
> Hi Javier!
> You can solve this both using windows, or using manual state.
> What is better depends a bit on when you want to have the result (the sum). 
> Do you want a result emitted after each update (or do some other operation 
> with that value) or do you want only the final sum after a certain time?
> For the second variant, I would use a window, for the first variant, you 
> could use custom state as follows:
> For each element, you take the current state for the key, add the value to 
> get the new sum. Then you update the state with the new sum and emit the 
> value as well...
> Java:
> DataStream<T
> uple2<String, Long>> stream = ...;
> DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new 
> RollingSum());
> public
>  class RollingSum extends RichMapFunction<Tuple2<String, Long>, 
> Tuple2<String, Long>> {
> private OperatorState<Long> sum;
> @Override
> public Tuple2<String, Long> map(Tuple2<String, Long> value) {
>         long 
> newSum = sum.value() + value.f1;
>         sum.update(newSum);
> return new Tuple2<>(value.f0, newSum);
> }
> @Override
> public void open(Configuration config) {
> counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
> }
> }
> In Scala, you can write this briefly as:
> val stream: DataStream[(String, Int)] = ...
> val counts: DataStream[(String, Int)] = stream
> .keyBy(_._1)
> .mapWithState((in: (String, Int), sum: Option[Int]) 
> => {
>     val newSum = in._2 + sum.getOrElse(0)
>     ( (
> in._1, newSum), Some(newSum) )
>  }
> Does that help?
> Thanks also for pointing out the error in the sample code...
> Greetings,
> Stephan
> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lo...@zalando.de> 
> wrote:
> Hi,
> We are trying to do a test using States but we have not been able to achieve 
> our desired result. Basically we have a data stream with data as 
> [{"id":"11","value":123}] and we want to calculate the sum of all values 
> grouping by ID. We were able to achieve this using windows but not with  
> states. The example that is in the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
>  is not very clear and even has some errors, for example:
> public class CounterSum implements RichReduceFunction<Long>
> should be
> public class CounterSum extends RichReduceFunction<Long>
> as RichReduceFuncion is a Class, not an interface.
> We wanted to ask you if you have an example of how to use States with Flink. 
> Thanks in advance for your help.

