Hi Alexis,

First of all, I think you leverage the partitioning and sorting properties
of the data returned by the database using SplitDataProperties.
However, please be aware that SplitDataProperties are a rather experimental
feature.

If used without query parameters, the JDBCInputFormat generates a single
split and queries the database just once. If you want to leverage
parallelism, you have to specify a query with parameters in the WHERE
clause to read different parts of the table.
Note, depending on the configuration of the database, multiple queries
result in multiple full scans. Hence, it might make sense to have an index
on the partitioning columns.

If properly configured, the JDBCInputFormat generates multiple splits which
are partitioned. Since the partitioning is encoded in the query, it is
opaque to Flink and must be explicitly declared.
This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
Flink that all records with the same value in the partitioning field are
read from the same split, i.e, the full data is partitioned on the
attribute across splits.
The same can be done for ordering if the queries of the JDBCInputFormat is
specified with an ORDER BY clause.
Partitioning and grouping are two different things. You can define a query
that partitions on hostname and orders by hostname and timestamp and
declare these properties in the SDP.

You can get a SDP object by calling DataSource.getSplitDataProperties(). In
your example this would be source.getSplitDataProperties().

Whatever you do, you should carefully check the execution plan
(ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
validate that the result are identical whether you use SDP or not.

Best, Fabian

[1] https://flink.apache.org/visualizer/

2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:

> Hi everyone,
>
> I have the following scenario: I have a database table with 3 columns: a
> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
> to do is:
>
> group by host and timestamp -> based on all the IDs in each group, create
> a mapping to n new tuples -> for each unique tuple, count how many times it
> appeared across the resulting data
>
> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>
> What I'm currently doing is roughly:
>
> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
> val source = environment.createInput(inut)
> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
> 1).aggregate(SUM, 2)
>
> The query given to JDBCInputFormat provides results ordered by host and
> timestamp, and I was wondering if performance can be improved by specifying
> this in the code. I've looked at http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Terminology-
> Split-Group-and-Partition-td11030.html and http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-
> Sorted-Input-Datasets-td20038.html, but I still have some questions:
>
> - If a split is a subset of a partition, what is the meaning of
> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that
> a split is divided into partitions, meaning that a partition would be a
> subset of a split.
> - At which point can I retrieve and adjust a SplitDataProperties instance,
> if possible at all?
> - If I wanted a coarser parallelization where each slot gets all the data
> for the same host, would I have to manually create the sub-groups based on
> timestamp?
>
> Regards,
> Alexis.
>
>

Reply via email to