Hi, All

As I mentioned in last few posts, I've developed a trident state to write
kafka data into DB, I am using persistentAggregate to update the DB, see


 @SuppressWarnings("serial")
        static class EventUpdater implements
ReducerAggregator<List<String>> {

            @Override
            public List<String> init(){
                     return null;
            }

            @Override
            public List<String> reduce(List<String> curr, TridentTuple
tuple) {
                   List<String> updated = null ;

                   if ( curr == null ) {
                                    String event = (String)
tuple.getValue(1);
                                    updated = Lists.newArrayList(event);
                   } else {
                                    updated = curr ;
                   }
              System.out.println(updated);
              return updated ;
            }
        }


topology.newStream("topictestspout", kafkaSpout)
                                    .each(new Fields("str"),
                                             new JsonObjectParse(),
                                             new Fields("userid","event"))

.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));


But when I send the topology to storm cluster , I got such error:

java.lang.RuntimeException: java.lang.IllegalArgumentException: Tuple
created with wrong number of fields. Expected 1 fields but got 3 fields at
backtype.storm.utils.DisruptorQueue.consumeBatchToCurs

Any ideas?

thanks

AL

Reply via email to