Hi, All

I have been struggling for the work to getting data from Kafka and write data 
into database (postgresql).  I thought it supposed to a simple task, but seems 
I stuck here. I’ve tried two ways. both of them didn’t work properly, here are 
my problems:

1. storm-kafka spout along with storm-RDBMS. I can get data from kafka by 
storm-kafka spout, but with the problem to write into DB, even I can write into 
DB, it write into DB per row not bulk copy. 

2. I used trident topology, and persistAggregator to update DB, with this 
method, I can write into DB, but can’t do bulk copy. Then I wrote a 
baseFunction to do that, still having trouble to pass connection into it, 
should I use singleton connection pool?


public static class WriteDB extends BaseFunction {
        PreparedStatement ps = null ;
        postgresConnector connector = new postgresConnector() ;
        Connection conn = connector.connectToDatabaseOrDie();
        @Override
        public final void execute(final TridentTuple tuple, final 
TridentCollector collector) {
            int user = tuple.getInteger(0);
            String value = tuple.getString(1);
            final StringBuilder queryBuilder = new StringBuilder()
                    .append("INSERT INTO test.state(userid, event) VALUES(")
                    .append(user)
                    .append(", '")
                    .append(value)
                    .append("')");
            try {
                ps = conn.prepareStatement(queryBuilder.toString()) ;
                ps.execute();
                collector.emit(new Values(tuple.getStringByField("event")));
            } catch (SQLException ex) {
                System.err.println("Caught IOException: " + ex.getMessage());
            }finally {
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException ex) {
                    }
                }
            }
        }
    }

I am sure writing data from kafka into database should not be a big issue with 
batch mode, however I can’t find a proper way to do it. Any help?


thanks

Alec

Reply via email to