Hi, All
I have been struggling for the work to getting data from Kafka and write data
into database (postgresql). I thought it supposed to a simple task, but seems
I stuck here. I’ve tried two ways. both of them didn’t work properly, here are
my problems:
1. storm-kafka spout along with storm-RDBMS. I can get data from kafka by
storm-kafka spout, but with the problem to write into DB, even I can write into
DB, it write into DB per row not bulk copy.
2. I used trident topology, and persistAggregator to update DB, with this
method, I can write into DB, but can’t do bulk copy. Then I wrote a
baseFunction to do that, still having trouble to pass connection into it,
should I use singleton connection pool?
public static class WriteDB extends BaseFunction {
PreparedStatement ps = null ;
postgresConnector connector = new postgresConnector() ;
Connection conn = connector.connectToDatabaseOrDie();
@Override
public final void execute(final TridentTuple tuple, final
TridentCollector collector) {
int user = tuple.getInteger(0);
String value = tuple.getString(1);
final StringBuilder queryBuilder = new StringBuilder()
.append("INSERT INTO test.state(userid, event) VALUES(")
.append(user)
.append(", '")
.append(value)
.append("')");
try {
ps = conn.prepareStatement(queryBuilder.toString()) ;
ps.execute();
collector.emit(new Values(tuple.getStringByField("event")));
} catch (SQLException ex) {
System.err.println("Caught IOException: " + ex.getMessage());
}finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException ex) {
}
}
}
}
}
I am sure writing data from kafka into database should not be a big issue with
batch mode, however I can’t find a proper way to do it. Any help?
thanks
Alec