Without changing anything, and I actually rollback the code from repo and
still not able to getting data in batch from kafka.

On Tue, Nov 25, 2014 at 11:46 AM, Sa Li <[email protected]> wrote:

> Is there a reason to make the tridentState only consume a record every
> running, it used to collect the data in batch.
>
> thanks
>
>
> On Wed, Nov 19, 2014 at 10:47 AM, Sa Li <[email protected]> wrote:
>
>>  System.out.println(updated);  call print all the messages from kafka,
>> but
>> System.out.println(keys); in multiPut only get one key instead of all the
>> keys passed into state.
>>
>>
>> Thanks
>>
>>
>> On Wed, Nov 19, 2014 at 10:43 AM, Sa Li <[email protected]> wrote:
>>
>>> Hi, Dear all
>>>
>>> I am using the tridentKafkaSpout to consume the data from kafka, and use
>>> persistentAggregate to update the data into postgresql DB. Here the code
>>> --------
>>>  topology.newStream("topictestspout", kafkaSpout)
>>>                               .each(new Fields("str"),
>>>                                        new JsonObjectParse(),
>>>                                        new Fields("userid","event"))
>>>                               .groupBy(new Fields("userid"))
>>>
>>> .persistentAggregate(PostgresqlState.newFactory(config), new
>>> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
>>> --------
>>>
>>> Here is the EventUpdater(), the data fields into stateFactory
>>>
>>> --------
>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>            @Override
>>>             public List<String> init(){
>>>                      return null;
>>>             }
>>>             @Override
>>>             public List<String> reduce(List<String> curr, TridentTuple
>>> tuple) {
>>>                          List<String> updated = null ;
>>>                          if ( curr == null ) {
>>>                                                  String event = (String)
>>> tuple.getValue(1);
>>>                                                  updated =
>>> Lists.newArrayList(event);
>>>                          } else {
>>>                                                  updated = curr ;
>>>                          }
>>>                     System.out.println(updated);
>>>                     return updated ;
>>>            }
>>> }
>>> --------
>>>
>>> As can see, I print the data I want to update, and I can see them on the
>>> screen. However, in the postgresqlState, I am only able to get one row of
>>> data. See,
>>>
>>> --------
>>> public class PostgresqlState<T> implements IBackingMap<T> {
>>> .
>>> .
>>> .
>>>               public void multiPut(final List<List<Object>> keys, final
>>> List<T> values) {
>>>                                 System.out.println(keys);
>>>
>>>
>>> -------
>>> In multiPut, I suppose to write multiple rows, and actually I can print
>>> lot of keys by above code, now all of sudden, it just print only one
>>> element list, see below what I see on the screen.
>>> .
>>> .
>>>
>>> [{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}]
>>>
>>> [{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}]
>>> multiPut start .....
>>> [[10000]]
>>> Below Values are inserted
>>> 64969 [Thread-8-$spoutcoord-spout0] INFO
>>>  storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> 64973 [Thread-8-$spoutcoord-spout0] INFO
>>>  storm.ingress.kafka.DynamicBrokersReader - Read partition info from
>>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093,
>>> 1=10.100.70.28:9094, 2=10.100.70.29:9092}}
>>> 65060 [Thread-16-spout0] INFO
>>>  storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing
>>> because 60000ms have expired
>>> .
>>> .
>>>
>>> Therefore each time I only write one row in database, one point to
>>> mention, the topic in kafka producer I am consuming is not persistently
>>> sent, it currently sitting in harddrive, so I use
>>>  spoutConf.forceFromStart = true;
>>>
>>> to consume from beginning. I really don't know why I can't process the
>>> data in batch in state, it used to work fine, I didn't change code and
>>> can't see any error from screen. Any idea about this?
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>

Reply via email to