Thanks for the quick response, I will try that From: Luke Cwik <[email protected]> Date: Wednesday, 10 November 2021 at 19:04 To: [email protected] <[email protected]> Subject: Re: JDBC.IO multiple lines Getting one record at a time is as expected since the entire result set may not fit into memory and the purpose of the RowMapper is to map one row of the ResultSset at a time.
If you know that the result set is always going to be small then you can either: 1) Rewrite the query so that you get all the results as a single row (efficient but can be inconvenient depending on the query) 2) Apply a GroupByKey with a singleton key like: p.apply(JdbcIO.readAll(...)) // PCollection<KV<String, SomeResult>> .apply(WithKeys.of(null)) // PCollection<KV<Void, KV<String, SomeResult>>> .apply(GroupByKey.createWithFewKeys()) // PCollection<KV<Void, Iterable<KV<String, SomeResult>>>> .apply(Values.create()) // PCollection<Iterable<KV<String, SomeResult>>> .apply(ParDo.of(new CombineResultsDoFn(...)) CombineResultsDoFn is a DoFn that you would author that would get a single iterable value containing all the mapped rows from the RowMapper. On Wed, Nov 10, 2021 at 6:48 AM Koffman, Noa (Nokia - IL/Kfar Sava) <[email protected]<mailto:[email protected]>> wrote: Hi, We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do some calculation on the resultset and create an object from that calculation. However, when the query run, we get each line in a different resultset in a different pcollection, and we are not sure how we can combine the different results (they all have the same key) So shortly the question is, how can we get a resultset with the entire query results (with all the lines)? Thanks! This is our current code: input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, SomeResult>>readAll() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "org.postgresql.Driver", PipelineUtil.getDbInfo()) .withUsername("user") .withPassword(PipelineUtil.readCredentials())) .withQuery("select * from some_table where some_id = ? order by insert_timestamp limit 5") .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() { @Override public void setParameters(KV<String, Iterable<String>> element, PreparedStatement preparedStatement) throws Exception { String someId = element.getKey(); preparedStatement.setString(1, someId); System.out.println("preparedStatement: " + preparedStatement); } }) .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() { public KV<String, SomeResult> mapRow(ResultSet resultSet) throws Exception { System.out.println("resultSet: " + resultSet); String someId = resultSet.getString("some_id"); OurObject ourObject = new OurObject ( resultSet.getString("col1"), resultSet.getString("col2"), resultSet.getFloat("col3"), resultSet.getString("col4"), resultSet.getLong("col5") ); SomeResult someResults = new SomeResult(ourObject, resultSet.getString("output_id"), resultSet.getString("insert_timestamp") ); System.out.println("someResults: " + someResults); return KV.of(nfcId, someResults); }
