Hello, all

Here is what I want to do:

getting messages from kafka -> postgresql DB

I am currently using OpaqueTridentKafkaSpout to receive data which is in
the form of json, parsed in a jsonObjectParse class,

 @SuppressWarnings("serial")
 public static class JsonObjectParse extends BaseFunction {
  @Override
  public final void execute(final TridentTuple tuple, final
TridentCollector collector) {
             String val = tuple.getString(0);
             int userid;
             JSONObject json = new JSONObject(val);
             try {
                                 userid = json.getInt("user") ;
                       }
                       catch (JSONException e) {
                                 userid = -1 ;
                       }
                       collector.emit(new Values(userid, val));
                   }
  }


After that, I am updating DB by persistentAggregate()

topology.newStream("topictestspout", kafkaSpout)
                                               .parallelismHint(4)
                                                .shuffle()
                                       .each(new Fields("str"),
                                                  new JsonObjectParse(),
                                                 new
Fields("userid","event"))
                                       .groupBy(new Fields("userid"))

.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));

Where PostgresqlState.newFactory(config) was imported from
https://github.com/geoforce/storm-postgresql

and I wrote EventUpdater() like such

       @SuppressWarnings("serial")
        static class EventUpdater implements
ReducerAggregator<List<String>> {
            @Override
            public List<String> init(){
                     return null;
            }
            @Override
            public List<String> reduce(List<String> curr, TridentTuple
tuple) {
                   List<String> updated = null ;
                   if ( curr == null ) {
                                    String event = (String)
tuple.getValue(1);
                                    updated = Lists.newArrayList(event);
                   } else {
                                    updated = curr ;
                   }
              System.out.println(updated);
              return updated ;
            }
        }


I am able to update the DB with single row each time, say for example, I
consume a topic from beginning,  spoutConf.forceFromStart = true; My
question is how to consume the data from kafkaSpout in batch, load in state
and write into DB? I am thinking if I can
accumulate data in JsonObjectParse, and bulkCopy to DB?


thanks

Alec

Reply via email to