It seems I may have spoken too soon. After executing the job with more data, I can see the following things in the Flink dashboard:
- The first subtask is a chained DataSource -> GroupCombine. Even with parallelism set to 24 and a ParameterValuesProvider returning Array(Array("first"), Array("second")), only 1 thread processed all records. - The second subtask is a Sorted Group Reduce, and I see two weird things: + The first subtask sent 5,923,802 records, yet the second subtask only received 5,575,154 records? + Again, everything was done in a single thread, even though a groupBy was used. - The third and final subtask is a sink that saves back to the database. Does anyone know why parallelism is not being used? Regards, Alexis. On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com> wrote: > Hi Fabian, > > Thanks a lot for the help. The scala DataSet, at least in version 1.5.0, > declares javaSet as private[flink], so I cannot access it directly. > Nevertheless, I managed to get around it by using the java environment: > > val env = org.apache.flink.api.java.ExecutionEnvironment. > getExecutionEnvironment > > val inputFormat = getInputFormat(query, dbUrl, properties) > val outputFormat = getOutputFormat(dbUrl, properties) > > val source = env.createInput(inputFormat) > val sdp = source.getSplitDataProperties > sdp.splitsPartitionedBy(0) > sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) > > // transform java DataSet to scala DataSet... > new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]]) > .groupBy(0, 1) > .combineGroup(groupCombiner) > .withForwardedFields("f0->_1") > .groupBy(0, 1) > .reduceGroup(groupReducer) > .withForwardedFields("_1") > .output(outputFormat) > > It seems to work well, and the semantic annotation does remove a hash > partition from the execution plan. > > Regards, > Alexis. > > > On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Alexis, >> >> The Scala API does not expose a DataSource object but only a Scala >> DataSet which wraps the Java object. >> You can get the SplitDataProperties from the Scala DataSet as follows: >> >> val dbData: DataSet[...] = ??? >> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties >> >> So you first have to get the wrapped Java DataSet, cast it to DataSource >> and then get the properties. >> It's not very nice, but should work. >> >> In order to use SDPs, you should be a bit familiar how physical data >> properties are propagated and discarded in the optimizer. >> For example, applying a simple MapFunction removes all properties because >> the function might have changed the fields on which a DataSet is >> partitioned or sorted. >> You can expose the behavior of a function to the optimizer by using >> Semantic Annotations [1] >> >> Some comments on the code and plan you shared: >> - You might want to add hostname to ORDER BY to have the output grouped >> by (ts, hostname). >> - Check the Global and Local data properties in the plan to validate that >> the SDP were correctly interpreted. >> - If the data is already correctly partitioned and sorted, you might not >> need the Combiners. In either case, you properly want to annotate them with >> Forward Field annoations. >> >> The number of source tasks is unrelated to the number of splits. If you >> have more tasks than splits, some tasks won't process any data. >> >> Best, Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations >> >> >> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >> >>> Hi Fabian, >>> >>> Thanks for the clarification. I have a few remarks, but let me provide >>> more concrete information. You can find the query I'm using, the >>> JDBCInputFormat creation, and the execution plan in this github gist: >>> >>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d >>> >>> I cannot call getSplitDataProperties because >>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the >>> code, I do this instead: >>> >>> val javaEnv = >>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment >>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, >>> "example") >>> >>> which feels wrong (the constructor doesn't accept a Scala environment). >>> Is there a better alternative? >>> >>> I see absolutely no difference in the execution plan whether I use SDP >>> or not, so therefore the results are indeed the same. Is this expected? >>> >>> My ParameterValuesProvider specifies 2 splits, yet the execution plan >>> shows Parallelism=24. Even the source code is a bit ambiguous, considering >>> that the constructor for GenericInputSplit takes two parameters: >>> partitionNumber and totalNumberOfPartitions. Should I assume that there are >>> 2 splits divided into 24 partitions? >>> >>> Regards, >>> Alexis. >>> >>> >>> >>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Alexis, >>>> >>>> First of all, I think you leverage the partitioning and sorting >>>> properties of the data returned by the database using SplitDataProperties. >>>> However, please be aware that SplitDataProperties are a rather >>>> experimental feature. >>>> >>>> If used without query parameters, the JDBCInputFormat generates a >>>> single split and queries the database just once. If you want to leverage >>>> parallelism, you have to specify a query with parameters in the WHERE >>>> clause to read different parts of the table. >>>> Note, depending on the configuration of the database, multiple queries >>>> result in multiple full scans. Hence, it might make sense to have an index >>>> on the partitioning columns. >>>> >>>> If properly configured, the JDBCInputFormat generates multiple splits >>>> which are partitioned. Since the partitioning is encoded in the query, it >>>> is opaque to Flink and must be explicitly declared. >>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells >>>> Flink that all records with the same value in the partitioning field are >>>> read from the same split, i.e, the full data is partitioned on the >>>> attribute across splits. >>>> The same can be done for ordering if the queries of the JDBCInputFormat >>>> is specified with an ORDER BY clause. >>>> Partitioning and grouping are two different things. You can define a >>>> query that partitions on hostname and orders by hostname and timestamp and >>>> declare these properties in the SDP. >>>> >>>> You can get a SDP object by calling >>>> DataSource.getSplitDataProperties(). In your example this would be >>>> source.getSplitDataProperties(). >>>> >>>> Whatever you do, you should carefully check the execution plan >>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and >>>> validate that the result are identical whether you use SDP or not. >>>> >>>> Best, Fabian >>>> >>>> [1] https://flink.apache.org/visualizer/ >>>> >>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>>> >>>>> Hi everyone, >>>>> >>>>> I have the following scenario: I have a database table with 3 columns: >>>>> a host (string), a timestamp, and an integer ID. Conceptually, what I'd >>>>> like to do is: >>>>> >>>>> group by host and timestamp -> based on all the IDs in each group, >>>>> create a mapping to n new tuples -> for each unique tuple, count how many >>>>> times it appeared across the resulting data >>>>> >>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >>>>> >>>>> What I'm currently doing is roughly: >>>>> >>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >>>>> val source = environment.createInput(inut) >>>>> source.partitionByHash("host", >>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2) >>>>> >>>>> The query given to JDBCInputFormat provides results ordered by host >>>>> and timestamp, and I was wondering if performance can be improved by >>>>> specifying this in the code. I've looked at >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html >>>>> and >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, >>>>> but I still have some questions: >>>>> >>>>> - If a split is a subset of a partition, what is the meaning of >>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a >>>>> split is divided into partitions, meaning that a partition would be a >>>>> subset of a split. >>>>> - At which point can I retrieve and adjust a SplitDataProperties >>>>> instance, if possible at all? >>>>> - If I wanted a coarser parallelization where each slot gets all the >>>>> data for the same host, would I have to manually create the sub-groups >>>>> based on timestamp? >>>>> >>>>> Regards, >>>>> Alexis. >>>>> >>>>> >>>> >>