Is there a reason to make the tridentState only consume a record every
running, it used to collect the data in batch.

thanks


On Wed, Nov 19, 2014 at 10:47 AM, Sa Li <[email protected]> wrote:

>  System.out.println(updated);  call print all the messages from kafka, but
> System.out.println(keys); in multiPut only get one key instead of all the
> keys passed into state.
>
>
> Thanks
>
>
> On Wed, Nov 19, 2014 at 10:43 AM, Sa Li <[email protected]> wrote:
>
>> Hi, Dear all
>>
>> I am using the tridentKafkaSpout to consume the data from kafka, and use
>> persistentAggregate to update the data into postgresql DB. Here the code
>> --------
>>  topology.newStream("topictestspout", kafkaSpout)
>>                               .each(new Fields("str"),
>>                                        new JsonObjectParse(),
>>                                        new Fields("userid","event"))
>>                               .groupBy(new Fields("userid"))
>>
>> .persistentAggregate(PostgresqlState.newFactory(config), new
>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
>> --------
>>
>> Here is the EventUpdater(), the data fields into stateFactory
>>
>> --------
>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>            @Override
>>             public List<String> init(){
>>                      return null;
>>             }
>>             @Override
>>             public List<String> reduce(List<String> curr, TridentTuple
>> tuple) {
>>                          List<String> updated = null ;
>>                          if ( curr == null ) {
>>                                                  String event = (String)
>> tuple.getValue(1);
>>                                                  updated =
>> Lists.newArrayList(event);
>>                          } else {
>>                                                  updated = curr ;
>>                          }
>>                     System.out.println(updated);
>>                     return updated ;
>>            }
>> }
>> --------
>>
>> As can see, I print the data I want to update, and I can see them on the
>> screen. However, in the postgresqlState, I am only able to get one row of
>> data. See,
>>
>> --------
>> public class PostgresqlState<T> implements IBackingMap<T> {
>> .
>> .
>> .
>>               public void multiPut(final List<List<Object>> keys, final
>> List<T> values) {
>>                                 System.out.println(keys);
>>
>>
>> -------
>> In multiPut, I suppose to write multiple rows, and actually I can print
>> lot of keys by above code, now all of sudden, it just print only one
>> element list, see below what I see on the screen.
>> .
>> .
>>
>> [{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}]
>>
>> [{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}]
>> multiPut start .....
>> [[10000]]
>> Below Values are inserted
>> 64969 [Thread-8-$spoutcoord-spout0] INFO
>>  storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 64973 [Thread-8-$spoutcoord-spout0] INFO
>>  storm.ingress.kafka.DynamicBrokersReader - Read partition info from
>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093,
>> 1=10.100.70.28:9094, 2=10.100.70.29:9092}}
>> 65060 [Thread-16-spout0] INFO  storm.ingress.kafka.trident.ZkBrokerReader
>> - brokers need refreshing because 60000ms have expired
>> .
>> .
>>
>> Therefore each time I only write one row in database, one point to
>> mention, the topic in kafka producer I am consuming is not persistently
>> sent, it currently sitting in harddrive, so I use
>>  spoutConf.forceFromStart = true;
>>
>> to consume from beginning. I really don't know why I can't process the
>> data in batch in state, it used to work fine, I didn't change code and
>> can't see any error from screen. Any idea about this?
>>
>>
>> thanks
>>
>> Alec
>>
>>
>>
>>
>>
>>
>>
>
>

Reply via email to