Hi, Dear all
I am using the tridentKafkaSpout to consume the data from kafka, and use
persistentAggregate to update the data into postgresql DB. Here the code
--------
topology.newStream("topictestspout", kafkaSpout)
.each(new Fields("str"),
new JsonObjectParse(),
new Fields("userid","event"))
.groupBy(new Fields("userid"))
.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
--------
Here is the EventUpdater(), the data fields into stateFactory
--------
static class EventUpdater implements ReducerAggregator<List<String>> {
@Override
public List<String> init(){
return null;
}
@Override
public List<String> reduce(List<String> curr, TridentTuple
tuple) {
List<String> updated = null ;
if ( curr == null ) {
String event = (String)
tuple.getValue(1);
updated =
Lists.newArrayList(event);
} else {
updated = curr ;
}
System.out.println(updated);
return updated ;
}
}
--------
As can see, I print the data I want to update, and I can see them on the
screen. However, in the postgresqlState, I am only able to get one row of
data. See,
--------
public class PostgresqlState<T> implements IBackingMap<T> {
.
.
.
public void multiPut(final List<List<Object>> keys, final
List<T> values) {
System.out.println(keys);
-------
In multiPut, I suppose to write multiple rows, and actually I can print lot
of keys by above code, now all of sudden, it just print only one element
list, see below what I see on the screen.
.
.
[{"time":1416271273,"machine":"FITSUM","user":10000,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.0}]
[{"time":1416271273,"machine":"FITSUM","user":10001,"eventType":"PageView","method":"GET","host":"localhost","page":"/mysite/mypath/v1","querystring":{"param1":"value1","param2":"value2"},"accepttypes":["application/json"],"loadtime":864.0,rendertime":279.0,"totaltime":1410.1}]
multiPut start .....
[[10000]]
Below Values are inserted
64969 [Thread-8-$spoutcoord-spout0] INFO
storm.ingress.kafka.trident.ZkBrokerReader - brokers need refreshing
because 60000ms have expired
64973 [Thread-8-$spoutcoord-spout0] INFO
storm.ingress.kafka.DynamicBrokersReader - Read partition info from
zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.28:9093, 1=
10.100.70.28:9094, 2=10.100.70.29:9092}}
65060 [Thread-16-spout0] INFO storm.ingress.kafka.trident.ZkBrokerReader -
brokers need refreshing because 60000ms have expired
.
.
Therefore each time I only write one row in database, one point to mention,
the topic in kafka producer I am consuming is not persistently sent, it
currently sitting in harddrive, so I use
spoutConf.forceFromStart = true;
to consume from beginning. I really don't know why I can't process the data
in batch in state, it used to work fine, I didn't change code and can't see
any error from screen. Any idea about this?
thanks
Alec