System.out.println(updated);  call print all the messages from kafka, but
System.out.println(keys); in multiPut only get one key instead of all the
keys passed into state.


Thanks


On Wed, Nov 19, 2014 at 10:43 AM, Sa Li <[email protected]> wrote:

> Hi, Dear all
>
> I am using the tridentKafkaSpout to consume the data from kafka, and use
> persistentAggregate to update the data into postgresql DB. Here the code
> --------
>  topology.newStream("topictestspout", kafkaSpout)
>                               .each(new Fields("str"),
>                                        new JsonObjectParse(),
>                                        new Fields("userid","event"))
>                               .groupBy(new Fields("userid"))
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
> --------
>
> Here is the EventUpdater(), the data fields into stateFactory
>
> --------
> static class EventUpdater implements ReducerAggregator<List<String>> {
>            @Override
>             public List<String> init(){
>                      return null;
>             }
>             @Override
>             public List<String> reduce(List<String> curr, TridentTuple
> tuple) {
>                          List<String> updated = null ;
>                          if ( curr == null ) {
>                                                  String event = (String)
> tuple.getValue(1);
>                                                  updated =
> Lists.newArrayList(event);
>                          } else {
>                                                  updated = curr ;
>                          }
>                     System.out.println(updated);
>                     return updated ;
>            }
> }
> --------
>
> As can see, I print the data I want to update, and I can see them on the
> screen. However, in the postgresqlState, I am only able to get one row of
> data. See,
>
> --------
> public class PostgresqlState<T> implements IBackingMap<T> {
> .
> .
> .
>               public void multiPut(final List<List<Object>> keys, final
> List<T> values) {
>                                 System.out.println(keys);
>
>
> -------
> In multiPut, I suppose to write multiple rows, and actually I can print
> lot of keys by above code, now all of sudden, it just print only one
> element list, see below what I see on the screen.
> .
> .
>
> [{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}]
>
> [{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}]
> multiPut start .....
> [[10000]]
> Below Values are inserted
> 64969 [Thread-8-$spoutcoord-spout0] INFO
>  storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing
> because 60000ms have expired
> 64973 [Thread-8-$spoutcoord-spout0] INFO
>  storm.ingress.kafka.DynamicBrokersReader - Read partition info from
> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093,
> 1=10.100.70.28:9094, 2=10.100.70.29:9092}}
> 65060 [Thread-16-spout0] INFO  storm.ingress.kafka.trident.ZkBrokerReader
> - brokers need refreshing because 60000ms have expired
> .
> .
>
> Therefore each time I only write one row in database, one point to
> mention, the topic in kafka producer I am consuming is not persistently
> sent, it currently sitting in harddrive, so I use
>  spoutConf.forceFromStart = true;
>
> to consume from beginning. I really don't know why I can't process the
> data in batch in state, it used to work fine, I didn't change code and
> can't see any error from screen. Any idea about this?
>
>
> thanks
>
> Alec
>
>
>
>
>
>
>

Reply via email to