Hi All,
I'm playing with the new mapWithState functionality but I can't get it
quite to work yet.
I'm doing two print() calls on the stream:
1. after mapWithState() call, first batch shows results - next batches
yield empty
2. after stateSnapshots(), always yields an empty RDD
Any pointers on what might be wrong?
This is the code I'm running:
final StateSpec state = StateSpec.function(UseCase::trackState);
JavaPairDStream<GroupingKey, Double> pairs = messages.<GroupingKey,
Double>mapToPair(UseCase::mapToPair);JavaMapWithStateDStream<GroupingKey,
Double, Double, Double> stateMap = pairs.mapWithState(state);
stateMap.print(5);
stateMap.stateSnapshots()
.print(5);
stream.context().remember(minutes(120));stream.context().checkpoint("/rsl/tmp/fxo-checkpoint");
private static Optional<Tuple2<GroupingKey, Double>> trackState(Time
batchTime, GroupingKey key, Optional<Double> value, State<Double>
state) {
Double current = state.exists() ? state.get() : 0.0;
Double sum = current + value.or(0.0);
return Optional.of(new Tuple2<>(key, sum));
}
Cheers,
Seb