System.out.println(updated); call print all the messages from kafka, but System.out.println(keys); in multiPut only get one key instead of all the keys passed into state.
Thanks On Wed, Nov 19, 2014 at 10:43 AM, Sa Li <[email protected]> wrote: > Hi, Dear all > > I am using the tridentKafkaSpout to consume the data from kafka, and use > persistentAggregate to update the data into postgresql DB. Here the code > -------- > topology.newStream("topictestspout", kafkaSpout) > .each(new Fields("str"), > new JsonObjectParse(), > new Fields("userid","event")) > .groupBy(new Fields("userid")) > > .persistentAggregate(PostgresqlState.newFactory(config), new > Fields("userid","event"), new EventUpdater(), new Fields( "eventword")); > -------- > > Here is the EventUpdater(), the data fields into stateFactory > > -------- > static class EventUpdater implements ReducerAggregator<List<String>> { > @Override > public List<String> init(){ > return null; > } > @Override > public List<String> reduce(List<String> curr, TridentTuple > tuple) { > List<String> updated = null ; > if ( curr == null ) { > String event = (String) > tuple.getValue(1); > updated = > Lists.newArrayList(event); > } else { > updated = curr ; > } > System.out.println(updated); > return updated ; > } > } > -------- > > As can see, I print the data I want to update, and I can see them on the > screen. However, in the postgresqlState, I am only able to get one row of > data. See, > > -------- > public class PostgresqlState<T> implements IBackingMap<T> { > . > . > . > public void multiPut(final List<List<Object>> keys, final > List<T> values) { > System.out.println(keys); > > > ------- > In multiPut, I suppose to write multiple rows, and actually I can print > lot of keys by above code, now all of sudden, it just print only one > element list, see below what I see on the screen. > . > . > > [{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}] > > [{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}] > multiPut start ..... > [[10000]] > Below Values are inserted > 64969 [Thread-8-$spoutcoord-spout0] INFO > storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing > because 60000ms have expired > 64973 [Thread-8-$spoutcoord-spout0] INFO > storm.ingress.kafka.DynamicBrokersReader - Read partition info from > zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093, > 1=10.100.70.28:9094, 2=10.100.70.29:9092}} > 65060 [Thread-16-spout0] INFO storm.ingress.kafka.trident.ZkBrokerReader > - brokers need refreshing because 60000ms have expired > . > . > > Therefore each time I only write one row in database, one point to > mention, the topic in kafka producer I am consuming is not persistently > sent, it currently sitting in harddrive, so I use > spoutConf.forceFromStart = true; > > to consume from beginning. I really don't know why I can't process the > data in batch in state, it used to work fine, I didn't change code and > can't see any error from screen. Any idea about this? > > > thanks > > Alec > > > > > > >
