Thanks for the quick response, I will try that

From: Luke Cwik <[email protected]>
Date: Wednesday, 10 November 2021 at 19:04
To: [email protected] <[email protected]>
Subject: Re: JDBC.IO multiple lines
Getting one record at a time is as expected since the entire result set may not 
fit into memory and the purpose of the RowMapper is to map one row of the 
ResultSset at a time.

If you know that the result set is always going to be small then you can either:
1) Rewrite the query so that you get all the results as a single row (efficient 
but can be inconvenient depending on the query)
2) Apply a GroupByKey with a singleton key like:
p.apply(JdbcIO.readAll(...))  // PCollection<KV<String, SomeResult>>
 .apply(WithKeys.of(null))     // PCollection<KV<Void, KV<String, SomeResult>>>
 .apply(GroupByKey.createWithFewKeys())    // PCollection<KV<Void, 
Iterable<KV<String, SomeResult>>>>
 .apply(Values.create())  // PCollection<Iterable<KV<String, SomeResult>>>
 .apply(ParDo.of(new CombineResultsDoFn(...))

CombineResultsDoFn is a DoFn that you would author that would get a single 
iterable value containing all the mapped rows from the RowMapper.


On Wed, Nov 10, 2021 at 6:48 AM Koffman, Noa (Nokia - IL/Kfar Sava) 
<[email protected]<mailto:[email protected]>> wrote:
Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do 
some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a 
different pcollection, and we are not sure how we can combine the different 
results (they all have the same key)

So shortly the question is, how can we get a resultset with the entire query 
results (with all the lines)?

Thanks!

This is our current code:

input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, 
SomeResult>>readAll()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                        "org.postgresql.Driver", PipelineUtil.getDbInfo())
                .withUsername("user")
                .withPassword(PipelineUtil.readCredentials()))
        .withQuery("select *  from some_table where some_id = ? order by 
insert_timestamp limit 5")
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
Iterable<String>>>() {
            @Override
            public void setParameters(KV<String, Iterable<String>> element,
                                      PreparedStatement preparedStatement) 
throws Exception {

                String someId = element.getKey();

                preparedStatement.setString(1, someId);
                System.out.println("preparedStatement: " + preparedStatement);
            }
        })
        .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
            public KV<String, SomeResult> mapRow(ResultSet resultSet) throws 
Exception {

                System.out.println("resultSet: " + resultSet);
                String someId = resultSet.getString("some_id");

                OurObject ourObject = new OurObject (
                        resultSet.getString("col1"),
                        resultSet.getString("col2"),
                        resultSet.getFloat("col3"),
                        resultSet.getString("col4"),
                        resultSet.getLong("col5")
                );
                SomeResult someResults = new SomeResult(ourObject,
                        resultSet.getString("output_id"),
                        resultSet.getString("insert_timestamp")
                );
                System.out.println("someResults: " + someResults);

                return KV.of(nfcId, someResults);
            }

  • JDBC.IO multiple lines Koffman, Noa (Nokia - IL/Kfar Sava)
    • Re: JDBC.IO multiple lines Koffman, Noa (Nokia - IL/Kfar Sava)

Reply via email to