Dear all
I am completely stuck here on persistentAggregate function, what I want to
do is to write json messages into postgresql, I use this package
https://github.com/geoforce/storm-postgresql
In that package, it does the writer job in such code:
final PostgresqlStateConfig config = new PostgresqlStateConfig();
{
config.setUrl(dburl);
config.setTable("test.state");
config.setKeyColumns(new String[]{"a"});
config.setValueColumns(new String[]{"count","bsum","csum"});
config.setType(StateType.NON_TRANSACTIONAL);
config.setCacheSize(1000);
}
stream.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("b", "c"), new CountSumSum(), new Fields("sum"));
Here all the data fields to be written are aggregated fields, like "count",
"bsum","csum", by
static class CountSumSum implements CombinerAggregator<List<Number>> {
@Override
public List<Number> init(TridentTuple tuple) {
return Lists.newArrayList(1L, (Number) tuple.getValue(0), (Number)
tuple.getValue(1));
}
@Override
public List<Number> combine(List<Number> val1, List<Number> val2) {
return Lists.newArrayList(Numbers.add(val1.get(0), val2.get(0)),
Numbers.add(val1.get(1), val2.get(1)), Numbers.add(val1.get(2),
val2.get(2)));
}
@Override
public List<Number> zero() {
return Lists.newArrayList((Number) 0, (Number) 0, (Number) 0);
}
}
in above code, the CountSumSum returns three columns,
"count" = Numbers.add(val1.get(0), val2.get(0))
"bsum" = Numbers.add(val1.get(1), val2.get(1))
,"csum" = Numbers.add(val1.get(2), val2.get(2))
However, what I want to have is the original message without doing
aggregation, like id, events, my question, in order to achieve this target,
what function I should use in persistentAggregate instead of implement
CombinerAggregator.
thanks
Alec