Is there a reason to make the tridentState only consume a record every running, it used to collect the data in batch.
thanks On Wed, Nov 19, 2014 at 10:47 AM, Sa Li <[email protected]> wrote: > System.out.println(updated); call print all the messages from kafka, but > System.out.println(keys); in multiPut only get one key instead of all the > keys passed into state. > > > Thanks > > > On Wed, Nov 19, 2014 at 10:43 AM, Sa Li <[email protected]> wrote: > >> Hi, Dear all >> >> I am using the tridentKafkaSpout to consume the data from kafka, and use >> persistentAggregate to update the data into postgresql DB. Here the code >> -------- >> topology.newStream("topictestspout", kafkaSpout) >> .each(new Fields("str"), >> new JsonObjectParse(), >> new Fields("userid","event")) >> .groupBy(new Fields("userid")) >> >> .persistentAggregate(PostgresqlState.newFactory(config), new >> Fields("userid","event"), new EventUpdater(), new Fields( "eventword")); >> -------- >> >> Here is the EventUpdater(), the data fields into stateFactory >> >> -------- >> static class EventUpdater implements ReducerAggregator<List<String>> { >> @Override >> public List<String> init(){ >> return null; >> } >> @Override >> public List<String> reduce(List<String> curr, TridentTuple >> tuple) { >> List<String> updated = null ; >> if ( curr == null ) { >> String event = (String) >> tuple.getValue(1); >> updated = >> Lists.newArrayList(event); >> } else { >> updated = curr ; >> } >> System.out.println(updated); >> return updated ; >> } >> } >> -------- >> >> As can see, I print the data I want to update, and I can see them on the >> screen. However, in the postgresqlState, I am only able to get one row of >> data. See, >> >> -------- >> public class PostgresqlState<T> implements IBackingMap<T> { >> . >> . >> . >> public void multiPut(final List<List<Object>> keys, final >> List<T> values) { >> System.out.println(keys); >> >> >> ------- >> In multiPut, I suppose to write multiple rows, and actually I can print >> lot of keys by above code, now all of sudden, it just print only one >> element list, see below what I see on the screen. >> . >> . >> >> [{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}] >> >> [{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}] >> multiPut start ..... >> [[10000]] >> Below Values are inserted >> 64969 [Thread-8-$spoutcoord-spout0] INFO >> storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing >> because 60000ms have expired >> 64973 [Thread-8-$spoutcoord-spout0] INFO >> storm.ingress.kafka.DynamicBrokersReader - Read partition info from >> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093, >> 1=10.100.70.28:9094, 2=10.100.70.29:9092}} >> 65060 [Thread-16-spout0] INFO storm.ingress.kafka.trident.ZkBrokerReader >> - brokers need refreshing because 60000ms have expired >> . >> . >> >> Therefore each time I only write one row in database, one point to >> mention, the topic in kafka producer I am consuming is not persistently >> sent, it currently sitting in harddrive, so I use >> spoutConf.forceFromStart = true; >> >> to consume from beginning. I really don't know why I can't process the >> data in batch in state, it used to work fine, I didn't change code and >> can't see any error from screen. Any idea about this? >> >> >> thanks >> >> Alec >> >> >> >> >> >> >> > >
