Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do 
some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a 
different pcollection, and we are not sure how we can combine the different 
results (they all have the same key)

So shortly the question is, how can we get a resultset with the entire query 
results (with all the lines)?

Thanks!

This is our current code:

input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, 
SomeResult>>readAll()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                        "org.postgresql.Driver", PipelineUtil.getDbInfo())
                .withUsername("user")
                .withPassword(PipelineUtil.readCredentials()))
        .withQuery("select *  from some_table where some_id = ? order by 
insert_timestamp limit 5")
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
Iterable<String>>>() {
            @Override
            public void setParameters(KV<String, Iterable<String>> element,
                                      PreparedStatement preparedStatement) 
throws Exception {

                String someId = element.getKey();

                preparedStatement.setString(1, someId);
                System.out.println("preparedStatement: " + preparedStatement);
            }
        })
        .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
            public KV<String, SomeResult> mapRow(ResultSet resultSet) throws 
Exception {

                System.out.println("resultSet: " + resultSet);
                String someId = resultSet.getString("some_id");

                OurObject ourObject = new OurObject (
                        resultSet.getString("col1"),
                        resultSet.getString("col2"),
                        resultSet.getFloat("col3"),
                        resultSet.getString("col4"),
                        resultSet.getLong("col5")
                );
                SomeResult someResults = new SomeResult(ourObject,
                        resultSet.getString("output_id"),
                        resultSet.getString("insert_timestamp")
                );
                System.out.println("someResults: " + someResults);

                return KV.of(nfcId, someResults);
            }

Reply via email to