Just saw I'm not calling state.update() in my trackState function..... I guess that is the issue!
On Fri, Jan 29, 2016 at 9:36 AM, Sebastian Piu <sebastian....@gmail.com> wrote: > Hi All, > > I'm playing with the new mapWithState functionality but I can't get it > quite to work yet. > > I'm doing two print() calls on the stream: > 1. after mapWithState() call, first batch shows results - next batches > yield empty > 2. after stateSnapshots(), always yields an empty RDD > > Any pointers on what might be wrong? > > This is the code I'm running: > > final StateSpec state = StateSpec.function(UseCase::trackState); > JavaPairDStream<GroupingKey, Double> pairs = messages.<GroupingKey, > Double>mapToPair(UseCase::mapToPair);JavaMapWithStateDStream<GroupingKey, > Double, Double, Double> stateMap = pairs.mapWithState(state); > > stateMap.print(5); > stateMap.stateSnapshots() > .print(5); > > stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint"); > > private static Optional<Tuple2<GroupingKey, Double>> trackState(Time > batchTime, GroupingKey key, Optional<Double> value, State<Double> state) { > Double current = state.exists() ? state.get() : 0.0; > Double sum = current + value.or(0.0); > return Optional.of(new Tuple2<>(key, sum)); > } > > > Cheers, > > Seb > >