I have not tried extending BaseFunction, but I would assume that you should be implementing the prepare method and setting connector and connection fields in there.
On Mon Dec 08 2014 at 10:37:19 PM Sa Li <[email protected]> wrote: > Hi, All > > I have been struggling for the work to getting data from Kafka and write > data into database (postgresql). I thought it supposed to a simple task, > but seems I stuck here. I’ve tried two ways. both of them didn’t work > properly, here are my problems: > > 1. storm-kafka spout along with storm-RDBMS. I can get data from kafka by > storm-kafka spout, but with the problem to write into DB, even I can write > into DB, it write into DB per row not bulk copy. > > 2. I used trident topology, and persistAggregator to update DB, with this > method, I can write into DB, but can’t do bulk copy. Then I wrote a > baseFunction to do that, still having trouble to pass connection into it, > should I use singleton connection pool? > > > public static class WriteDB extends BaseFunction { > PreparedStatement ps = null ; > postgresConnector connector = new postgresConnector() ; > Connection conn = connector.connectToDatabaseOrDie(); > @Override > public final void execute(final TridentTuple tuple, final > TridentCollector collector) { > int user = tuple.getInteger(0); > String value = tuple.getString(1); > final StringBuilder queryBuilder = new StringBuilder() > .append("INSERT INTO test.state(userid, event) > VALUES(") > .append(user) > .append(", '") > .append(value) > .append("')"); > try { > ps = conn.prepareStatement(queryBuilder.toString()) ; > ps.execute(); > collector.emit(new Values(tuple.getStringByField( > "event"))); > } catch (SQLException ex) { > System.err.println("Caught IOException: " + > ex.getMessage()); > }finally { > if (ps != null) { > try { > ps.close(); > } catch (SQLException ex) { > } > } > } > } > } > > I am sure writing data from kafka into database should not be a big issue > with batch mode, however I can’t find a proper way to do it. Any help? > > > thanks > > Alec
