Hello, all
Here is what I want to do:
getting messages from kafka -> postgresql DB
I am currently using OpaqueTridentKafkaSpout to receive data which is in
the form of json, parsed in a jsonObjectParse class,
@SuppressWarnings("serial")
public static class JsonObjectParse extends BaseFunction {
@Override
public final void execute(final TridentTuple tuple, final
TridentCollector collector) {
String val = tuple.getString(0);
int userid;
JSONObject json = new JSONObject(val);
try {
userid = json.getInt("user") ;
}
catch (JSONException e) {
userid = -1 ;
}
collector.emit(new Values(userid, val));
}
}
After that, I am updating DB by persistentAggregate()
topology.newStream("topictestspout", kafkaSpout)
.parallelismHint(4)
.shuffle()
.each(new Fields("str"),
new JsonObjectParse(),
new
Fields("userid","event"))
.groupBy(new Fields("userid"))
.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
Where PostgresqlState.newFactory(config) was imported from
https://github.com/geoforce/storm-postgresql
and I wrote EventUpdater() like such
@SuppressWarnings("serial")
static class EventUpdater implements
ReducerAggregator<List<String>> {
@Override
public List<String> init(){
return null;
}
@Override
public List<String> reduce(List<String> curr, TridentTuple
tuple) {
List<String> updated = null ;
if ( curr == null ) {
String event = (String)
tuple.getValue(1);
updated = Lists.newArrayList(event);
} else {
updated = curr ;
}
System.out.println(updated);
return updated ;
}
}
I am able to update the DB with single row each time, say for example, I
consume a topic from beginning, spoutConf.forceFromStart = true; My
question is how to consume the data from kafkaSpout in batch, load in state
and write into DB? I am thinking if I can
accumulate data in JsonObjectParse, and bulkCopy to DB?
thanks
Alec