Without changing anything, and I actually rollback the code from repo and still not able to getting data in batch from kafka.
On Tue, Nov 25, 2014 at 11:46 AM, Sa Li <[email protected]> wrote: > Is there a reason to make the tridentState only consume a record every > running, it used to collect the data in batch. > > thanks > > > On Wed, Nov 19, 2014 at 10:47 AM, Sa Li <[email protected]> wrote: > >> System.out.println(updated); call print all the messages from kafka, >> but >> System.out.println(keys); in multiPut only get one key instead of all the >> keys passed into state. >> >> >> Thanks >> >> >> On Wed, Nov 19, 2014 at 10:43 AM, Sa Li <[email protected]> wrote: >> >>> Hi, Dear all >>> >>> I am using the tridentKafkaSpout to consume the data from kafka, and use >>> persistentAggregate to update the data into postgresql DB. Here the code >>> -------- >>> topology.newStream("topictestspout", kafkaSpout) >>> .each(new Fields("str"), >>> new JsonObjectParse(), >>> new Fields("userid","event")) >>> .groupBy(new Fields("userid")) >>> >>> .persistentAggregate(PostgresqlState.newFactory(config), new >>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword")); >>> -------- >>> >>> Here is the EventUpdater(), the data fields into stateFactory >>> >>> -------- >>> static class EventUpdater implements ReducerAggregator<List<String>> { >>> @Override >>> public List<String> init(){ >>> return null; >>> } >>> @Override >>> public List<String> reduce(List<String> curr, TridentTuple >>> tuple) { >>> List<String> updated = null ; >>> if ( curr == null ) { >>> String event = (String) >>> tuple.getValue(1); >>> updated = >>> Lists.newArrayList(event); >>> } else { >>> updated = curr ; >>> } >>> System.out.println(updated); >>> return updated ; >>> } >>> } >>> -------- >>> >>> As can see, I print the data I want to update, and I can see them on the >>> screen. However, in the postgresqlState, I am only able to get one row of >>> data. See, >>> >>> -------- >>> public class PostgresqlState<T> implements IBackingMap<T> { >>> . >>> . >>> . >>> public void multiPut(final List<List<Object>> keys, final >>> List<T> values) { >>> System.out.println(keys); >>> >>> >>> ------- >>> In multiPut, I suppose to write multiple rows, and actually I can print >>> lot of keys by above code, now all of sudden, it just print only one >>> element list, see below what I see on the screen. >>> . >>> . >>> >>> [{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}] >>> >>> [{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}] >>> multiPut start ..... >>> [[10000]] >>> Below Values are inserted >>> 64969 [Thread-8-$spoutcoord-spout0] INFO >>> storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing >>> because 60000ms have expired >>> 64973 [Thread-8-$spoutcoord-spout0] INFO >>> storm.ingress.kafka.DynamicBrokersReader - Read partition info from >>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093, >>> 1=10.100.70.28:9094, 2=10.100.70.29:9092}} >>> 65060 [Thread-16-spout0] INFO >>> storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing >>> because 60000ms have expired >>> . >>> . >>> >>> Therefore each time I only write one row in database, one point to >>> mention, the topic in kafka producer I am consuming is not persistently >>> sent, it currently sitting in harddrive, so I use >>> spoutConf.forceFromStart = true; >>> >>> to consume from beginning. I really don't know why I can't process the >>> data in batch in state, it used to work fine, I didn't change code and >>> can't see any error from screen. Any idea about this? >>> >>> >>> thanks >>> >>> Alec >>> >>> >>> >>> >>> >>> >>> >> >> >
