Hi, All
As I mentioned in last few posts, I've developed a trident state to write
kafka data into DB, I am using persistentAggregate to update the DB, see
@SuppressWarnings("serial")
static class EventUpdater implements
ReducerAggregator<List<String>> {
@Override
public List<String> init(){
return null;
}
@Override
public List<String> reduce(List<String> curr, TridentTuple
tuple) {
List<String> updated = null ;
if ( curr == null ) {
String event = (String)
tuple.getValue(1);
updated = Lists.newArrayList(event);
} else {
updated = curr ;
}
System.out.println(updated);
return updated ;
}
}
topology.newStream("topictestspout", kafkaSpout)
.each(new Fields("str"),
new JsonObjectParse(),
new Fields("userid","event"))
.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
But when I send the topology to storm cluster , I got such error:
java.lang.RuntimeException: java.lang.IllegalArgumentException: Tuple
created with wrong number of fields. Expected 1 fields but got 3 fields at
backtype.storm.utils.DisruptorQueue.consumeBatchToCurs
Any ideas?
thanks
AL