Re: mapWithState / stateSnapshots() yielding empty rdds?

2016-01-29 Thread Sebastian Piu
Just saw I'm not calling state.update() in my trackState function. I
guess that is the issue!

On Fri, Jan 29, 2016 at 9:36 AM, Sebastian Piu 
wrote:

> Hi All,
>
> I'm playing with the new mapWithState functionality but I can't get it
> quite to work yet.
>
> I'm doing two print() calls on the stream:
> 1. after mapWithState() call, first batch shows results - next batches
> yield empty
> 2. after stateSnapshots(), always yields an empty RDD
>
> Any pointers on what might be wrong?
>
> This is the code I'm running:
>
> final StateSpec state = StateSpec.function(UseCase::trackState);
> JavaPairDStream pairs = messages. Double>mapToPair(UseCase::mapToPair);JavaMapWithStateDStream Double, Double, Double> stateMap = pairs.mapWithState(state);
>
> stateMap.print(5);
> stateMap.stateSnapshots()
> .print(5);
>
> stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint");
>
> private static Optional> trackState(Time 
> batchTime, GroupingKey key, Optional value, State state) {
> Double current = state.exists() ? state.get() : 0.0;
> Double sum = current + value.or(0.0);
> return Optional.of(new Tuple2<>(key, sum));
> }
>
>
> Cheers,
>
> Seb
>
>


mapWithState / stateSnapshots() yielding empty rdds?

2016-01-29 Thread Sebastian Piu
Hi All,

I'm playing with the new mapWithState functionality but I can't get it
quite to work yet.

I'm doing two print() calls on the stream:
1. after mapWithState() call, first batch shows results - next batches
yield empty
2. after stateSnapshots(), always yields an empty RDD

Any pointers on what might be wrong?

This is the code I'm running:

final StateSpec state = StateSpec.function(UseCase::trackState);
JavaPairDStream pairs = messages.mapToPair(UseCase::mapToPair);JavaMapWithStateDStream stateMap = pairs.mapWithState(state);

stateMap.print(5);
stateMap.stateSnapshots()
.print(5);

stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint");

private static Optional> trackState(Time
batchTime, GroupingKey key, Optional value, State
state) {
Double current = state.exists() ? state.get() : 0.0;
Double sum = current + value.or(0.0);
return Optional.of(new Tuple2<>(key, sum));
}


Cheers,

Seb