I have not tried extending BaseFunction, but I would assume that you should
be implementing the prepare method and setting connector and connection
fields in there.

On Mon Dec 08 2014 at 10:37:19 PM Sa Li <[email protected]> wrote:

> Hi, All
>
> I have been struggling for the work to getting data from Kafka and write
> data into database (postgresql).  I thought it supposed to a simple task,
> but seems I stuck here. I’ve tried two ways. both of them didn’t work
> properly, here are my problems:
>
> 1. storm-kafka spout along with storm-RDBMS. I can get data from kafka by
> storm-kafka spout, but with the problem to write into DB, even I can write
> into DB, it write into DB per row not bulk copy.
>
> 2. I used trident topology, and persistAggregator to update DB, with this
> method, I can write into DB, but can’t do bulk copy. Then I wrote a
> baseFunction to do that, still having trouble to pass connection into it,
> should I use singleton connection pool?
>
>
> public static class WriteDB extends BaseFunction {
>         PreparedStatement ps = null ;
>         postgresConnector connector = new postgresConnector() ;
>         Connection conn = connector.connectToDatabaseOrDie();
>         @Override
>         public final void execute(final TridentTuple tuple, final
> TridentCollector collector) {
>             int user = tuple.getInteger(0);
>             String value = tuple.getString(1);
>             final StringBuilder queryBuilder = new StringBuilder()
>                     .append("INSERT INTO test.state(userid, event)
> VALUES(")
>                     .append(user)
>                     .append(", '")
>                     .append(value)
>                     .append("')");
>             try {
>                 ps = conn.prepareStatement(queryBuilder.toString()) ;
>                 ps.execute();
>                 collector.emit(new Values(tuple.getStringByField(
> "event")));
>             } catch (SQLException ex) {
>                 System.err.println("Caught IOException: " +
> ex.getMessage());
>             }finally {
>                 if (ps != null) {
>                     try {
>                         ps.close();
>                     } catch (SQLException ex) {
>                     }
>                 }
>             }
>         }
>     }
>
> I am sure writing data from kafka into database should not be a big issue
> with batch mode, however I can’t find a proper way to do it. Any help?
>
>
> thanks
>
> Alec

Reply via email to