Quick update.

After some testing, we have noticed that the splittable JdbcIO-poc works
well when the number of splits does not exceed the number of spark tasks.
In cases where the number of splits do exceed the task count, the pipeline
freezes after each worker has processed a single split each.

In our case, we are attempting to read 895676 rows, with 10000 row splits.
We are running this on a single Spark worker for testing purposes, and the
stage is split into 8 tasks. The pipeline freezes after 8 splits have been
processed. We are able to verify that the @ProcessElements-function returns
without exceptions.

On the direct-runner, we noticed that setting the split size to less than
the checkpoint-size (hardcoded to 100) mitigated this problem. Any higher
value will cause the pipeline to freeze just like with the SparkRunner.

Is there anyone who can help shine a light on this?

On Wed, May 26, 2021 at 7:50 AM Thomas Fredriksen(External) <
[email protected]> wrote:

> There is no forking after the "Generate Queries" transform.
>
> We noticed that the "Generate Queries" transform is in a different stage
> than the reading itself. This is likely due to the Reparallelize-transform,
> and we also see this with JdbcIO.readAll.
>
> After reading up on Splittable DoFn's, we decided to give it a try. We
> essentially copied the source of JdbcIO into our project and changed
> `ReadFn` into `SplittableJdbcIO` which acts as (more or less) a drop-in
> replacement (see source below).
>
> The DAG here is seemingly simpler with a single stage containing all steps
> from reading the DB to writing. We are also seeing that the job is
> parallelizing much better than before.
>
> class SplittableJdbcIO {
>>     /* ... */
>>         @ProcessElement
>>         public void processElement(@Element ParameterT element,
>>                                    RestrictionTracker<OffsetRange, Long>
>> tracker,
>>                                    OutputReceiver<OutputT> out) throws
>> Exception {
>>             if (connection == null) {
>>                 connection = dataSource.getConnection();
>>             }
>>
>>             if
>> (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>                 LOG.error("Failed to claim restriction");
>>                 ProcessContinuation.stop();
>>             }
>>
>>             LOG.info("Preparing query. fetchSize={}, shardSize={},
>> from={}, to={}", fetchSize, shardSize,
>>                     tracker.currentRestriction().getFrom(),
>> tracker.currentRestriction().getTo());
>>
>>             String executeQuery = String.format("SELECT * FROM (%s) t
>> OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;", query.get());
>>
>>             // PostgreSQL requires autocommit to be disabled to enable
>> cursor streaming
>>             // see
>> https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
>>             LOG.debug("Autocommit has been disabled");
>>             connection.setAutoCommit(false);
>>             try (PreparedStatement statement =
>> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
>> ResultSet.CONCUR_READ_ONLY)) {
>>                 statement.setFetchSize(fetchSize);
>>                 parameterSetter.setParameters(element, statement);
>>
>>
>> statement.setLong(statement.getParameterMetaData().getParameterCount() - 1,
>> tracker.currentRestriction().getFrom());
>>
>> statement.setLong(statement.getParameterMetaData().getParameterCount(),
>>                         tracker.currentRestriction().getTo() -
>> tracker.currentRestriction().getFrom());
>>
>>                 queryCounter.inc();
>>
>>                 int count = 0;
>>                 long t0 = Instant.now().getMillis();
>>
>>                 try (ResultSet resultSet = statement.executeQuery()) {
>>                     queryLatency.update(Instant.now().getMillis() - t0);
>>                     LOG.info("Query took {} ms",
>> Instant.now().getMillis() - t0);
>>
>>                     while (resultSet.next()) {
>>                         out.output(rowMapper.mapRow(resultSet));
>>
>>                         rowCounter.inc();
>>                         count++;
>>                     }
>>                     LOG.info("Fetched {} rows", count);
>>
>>                 }
>>             }
>>         }
>>
>>         @SplitRestriction
>>         public void splitRestriction(@Element ParameterT element,
>>                                      @Restriction OffsetRange range,
>>                                      OutputReceiver<OffsetRange> out) {
>>             Long span = range.getTo() - range.getFrom();
>>
>>             for (long i = range.getFrom(); i < range.getTo(); i +=
>> shardSize) {
>>                 out.output(new OffsetRange(i, Math.min(i + shardSize,
>> range.getTo())));
>>             }
>>         }
>>
>>         @Teardown
>>         public void teardown() throws Exception {
>>             if (connection != null) {
>>                 try {
>>                     connection.close();
>>                 } finally {
>>                     connection = null;
>>                 }
>>             }
>>         }
>>
>>         @GetInitialRestriction
>>         public OffsetRange initialRestriction(@Element ParameterT
>> element) throws Exception {
>>
>>             if (connection == null) {
>>                 connection = dataSource.getConnection();
>>             }
>>
>>             String executeQuery = String.format("SELECT COUNT(*) FROM
>> (%s) t;", query.get());
>>
>>             try (PreparedStatement statement =
>> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
>> ResultSet.CONCUR_READ_ONLY)) {
>>                 parameterSetter.setParameters(element, statement);
>>
>>                 try (ResultSet resultSet = statement.executeQuery()) {
>>                     resultSet.next();
>>
>>                     long result = resultSet.getLong(1);
>>                     LOG.info("The query results in {} rows", result);
>>                     return new OffsetRange(0, result);
>>                 }
>>             }
>>         }
>>     }
>> }
>>
>
>
>
> On Tue, May 25, 2021 at 6:35 PM Alexey Romanenko <[email protected]>
> wrote:
>
>> Hi,
>>
>> Did you check a Spark DAG if it doesn’t fork branches after "Genereate
>> queries” transform?
>>
>> —
>> Alexey
>>
>> On 24 May 2021, at 20:32, Thomas Fredriksen(External) <
>> [email protected]> wrote:
>>
>> Hi there,
>>
>> We are struggling to get the JdbcIO-connector to read a large table on
>> spark.
>>
>> In short - we wish to read a large table (several billion rows),
>> transform then write the transformed data to a new table.
>>
>> We are aware that `JdbcIO.read()` does not parallelize. In order to solve
>> this, we attempted to create ranges then generate `limit/offset` queries
>> and use `JdbcIO.readAll()` instead.
>>
>> The overall steps look something like this (sanitized for readability):
>>
>> ```
>> pipeline
>>   .apply("Read row count", JdbcIo.read()
>>     .withQuery("select count(*) from MYTABLE;")
>>     .withCoder(VarLongCoder.of())
>>     .withOtherOptions(...))
>>   .apply("Genereate queries", ParDo.of(new DoFn<Long, Long>() {...}) //
>> Outputs table offsets
>>   .apply("Read results", JdbcIO.<Long, Row>readAll()
>>     .withCoder(SchemaCoder.of(...))
>>     .withOutputParallelization(false)
>>     .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
>>     .withParameterSetter((element, statement) -> statement.setLong(1,
>> element))
>>     .withOtherOptions(...))
>>   .apply("more steps", ...);
>> ```
>>
>> The problem is that this does not seem to parallelize on the spark
>> runner. Only a single worker seem to be doing all the work.
>>
>> We have tried to break fusion using a variant of
>> `JdbcIO.Reparallelize()`, however this did not seem to make a difference.
>>
>> Our goal is to avoid all data from the query be cached in memory between
>> the read and transform operations. This causes OOM-exceptions. Having a
>> single worker reading the database is okay as long as other workers can
>> process the data as soon as it is read and not having to wait for all the
>> data to be ready.
>>
>> Any advice on how we approach this.
>>
>> Best Regards
>> Thomas Li Fredriksen
>>
>>
>>

Reply via email to