Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do
some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a
different pcollection, and we are not sure how we can combine the different
results (they all have the same key)
So shortly the question is, how can we get a resultset with the entire query
results (with all the lines)?
Thanks!
This is our current code:
input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String,
SomeResult>>readAll()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", PipelineUtil.getDbInfo())
.withUsername("user")
.withPassword(PipelineUtil.readCredentials()))
.withQuery("select * from some_table where some_id = ? order by
insert_timestamp limit 5")
.withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String,
Iterable<String>>>() {
@Override
public void setParameters(KV<String, Iterable<String>> element,
PreparedStatement preparedStatement)
throws Exception {
String someId = element.getKey();
preparedStatement.setString(1, someId);
System.out.println("preparedStatement: " + preparedStatement);
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
public KV<String, SomeResult> mapRow(ResultSet resultSet) throws
Exception {
System.out.println("resultSet: " + resultSet);
String someId = resultSet.getString("some_id");
OurObject ourObject = new OurObject (
resultSet.getString("col1"),
resultSet.getString("col2"),
resultSet.getFloat("col3"),
resultSet.getString("col4"),
resultSet.getLong("col5")
);
SomeResult someResults = new SomeResult(ourObject,
resultSet.getString("output_id"),
resultSet.getString("insert_timestamp")
);
System.out.println("someResults: " + someResults);
return KV.of(nfcId, someResults);
}