There is no forking after the "Generate Queries" transform.
We noticed that the "Generate Queries" transform is in a different stage
than the reading itself. This is likely due to the Reparallelize-transform,
and we also see this with JdbcIO.readAll.
After reading up on Splittable DoFn's, we decided to give it a try. We
essentially copied the source of JdbcIO into our project and changed
`ReadFn` into `SplittableJdbcIO` which acts as (more or less) a drop-in
replacement (see source below).
The DAG here is seemingly simpler with a single stage containing all steps
from reading the DB to writing. We are also seeing that the job is
parallelizing much better than before.
class SplittableJdbcIO {
> /* ... */
> @ProcessElement
> public void processElement(@Element ParameterT element,
> RestrictionTracker<OffsetRange, Long>
> tracker,
> OutputReceiver<OutputT> out) throws
> Exception {
> if (connection == null) {
> connection = dataSource.getConnection();
> }
>
> if (!tracker.tryClaim(tracker.currentRestriction().getFrom()))
> {
> LOG.error("Failed to claim restriction");
> ProcessContinuation.stop();
> }
>
> LOG.info("Preparing query. fetchSize={}, shardSize={},
> from={}, to={}", fetchSize, shardSize,
> tracker.currentRestriction().getFrom(),
> tracker.currentRestriction().getTo());
>
> String executeQuery = String.format("SELECT * FROM (%s) t
> OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;", query.get());
>
> // PostgreSQL requires autocommit to be disabled to enable
> cursor streaming
> // see
> https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
> LOG.debug("Autocommit has been disabled");
> connection.setAutoCommit(false);
> try (PreparedStatement statement =
> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
> ResultSet.CONCUR_READ_ONLY)) {
> statement.setFetchSize(fetchSize);
> parameterSetter.setParameters(element, statement);
>
>
> statement.setLong(statement.getParameterMetaData().getParameterCount() - 1,
> tracker.currentRestriction().getFrom());
>
> statement.setLong(statement.getParameterMetaData().getParameterCount(),
> tracker.currentRestriction().getTo() -
> tracker.currentRestriction().getFrom());
>
> queryCounter.inc();
>
> int count = 0;
> long t0 = Instant.now().getMillis();
>
> try (ResultSet resultSet = statement.executeQuery()) {
> queryLatency.update(Instant.now().getMillis() - t0);
> LOG.info("Query took {} ms", Instant.now().getMillis()
> - t0);
>
> while (resultSet.next()) {
> out.output(rowMapper.mapRow(resultSet));
>
> rowCounter.inc();
> count++;
> }
> LOG.info("Fetched {} rows", count);
>
> }
> }
> }
>
> @SplitRestriction
> public void splitRestriction(@Element ParameterT element,
> @Restriction OffsetRange range,
> OutputReceiver<OffsetRange> out) {
> Long span = range.getTo() - range.getFrom();
>
> for (long i = range.getFrom(); i < range.getTo(); i +=
> shardSize) {
> out.output(new OffsetRange(i, Math.min(i + shardSize,
> range.getTo())));
> }
> }
>
> @Teardown
> public void teardown() throws Exception {
> if (connection != null) {
> try {
> connection.close();
> } finally {
> connection = null;
> }
> }
> }
>
> @GetInitialRestriction
> public OffsetRange initialRestriction(@Element ParameterT element)
> throws Exception {
>
> if (connection == null) {
> connection = dataSource.getConnection();
> }
>
> String executeQuery = String.format("SELECT COUNT(*) FROM (%s)
> t;", query.get());
>
> try (PreparedStatement statement =
> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
> ResultSet.CONCUR_READ_ONLY)) {
> parameterSetter.setParameters(element, statement);
>
> try (ResultSet resultSet = statement.executeQuery()) {
> resultSet.next();
>
> long result = resultSet.getLong(1);
> LOG.info("The query results in {} rows", result);
> return new OffsetRange(0, result);
> }
> }
> }
> }
> }
>
On Tue, May 25, 2021 at 6:35 PM Alexey Romanenko <[email protected]>
wrote:
> Hi,
>
> Did you check a Spark DAG if it doesn’t fork branches after "Genereate
> queries” transform?
>
> —
> Alexey
>
> On 24 May 2021, at 20:32, Thomas Fredriksen(External) <
> [email protected]> wrote:
>
> Hi there,
>
> We are struggling to get the JdbcIO-connector to read a large table on
> spark.
>
> In short - we wish to read a large table (several billion rows), transform
> then write the transformed data to a new table.
>
> We are aware that `JdbcIO.read()` does not parallelize. In order to solve
> this, we attempted to create ranges then generate `limit/offset` queries
> and use `JdbcIO.readAll()` instead.
>
> The overall steps look something like this (sanitized for readability):
>
> ```
> pipeline
> .apply("Read row count", JdbcIo.read()
> .withQuery("select count(*) from MYTABLE;")
> .withCoder(VarLongCoder.of())
> .withOtherOptions(...))
> .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) //
> Outputs table offsets
> .apply("Read results", JdbcIO.<Long, Row>readAll()
> .withCoder(SchemaCoder.of(...))
> .withOutputParallelization(false)
> .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
> .withParameterSetter((element, statement) -> statement.setLong(1,
> element))
> .withOtherOptions(...))
> .apply("more steps", ...);
> ```
>
> The problem is that this does not seem to parallelize on the spark runner.
> Only a single worker seem to be doing all the work.
>
> We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`,
> however this did not seem to make a difference.
>
> Our goal is to avoid all data from the query be cached in memory between
> the read and transform operations. This causes OOM-exceptions. Having a
> single worker reading the database is okay as long as other workers can
> process the data as soon as it is read and not having to wait for all the
> data to be ready.
>
> Any advice on how we approach this.
>
> Best Regards
> Thomas Li Fredriksen
>
>
>