Hi, have you thought about making two independent jobs out of this? (or you call execute() for the two separate parts) One job for the update() and one for the insert() ?
Even though the update operation should not be expensive, I think its helpful to understand the performance impact of having concurrent insert / updates vs executing these operations sequentially ? Are the inserts / updates performed on the same table? On Thu, Jan 21, 2016 at 4:17 PM, Maximilian Bode < maximilian.b...@tngtech.com> wrote: > Hi Robert, > sorry, I should have been clearer in my initial mail. The two cases I was > comparing are: > > 1) distinct() before Insert (which is necessary as we have a unique key > constraint in our database), no distinct() before update > 2) distinct() before insert AND distinct() before update > > The test data used actually only contains unique values for the affected > field though, so the dataset size is not reduced in case 2. > > In case 1 the insert does not start until all the data has arrived at > distinct() while the update is already going along (slowing down upstream > operators as well). In case 2 both sinks wait for their respective > distinct()'s (which is reached much faster now), then start roughly at the > same time leading to a shorter net job time for job 2 as compared to 1. > > A pause operator might be useful, yes. > > The update should not be an inherently much more expensive operation, as > the WHERE clause only contains the table's primary key. > > Cheers, > Max > — > Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com * 0176 > 1000 75 50 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > Am 21.01.2016 um 15:57 schrieb Robert Metzger <rmetz...@apache.org>: > > Hi Max, > > is the distinct() operation reducing the size of the DataSet? If so, I > assume you have an idempotent update and the job is faster because fewer > updates are done? > if the distinct() operator is not changing anything, then, the job might > be faster because the INSERT is done while Flink is still executing the > distinct() operation. So the insert is over when the updates are starting. > This would mean that concurrent inserts and updates on the database are > much slower than doing this sequentially. > > I'm wondering if there is a way in Flink to explicitly ask for spilling an > intermediate operator to "pause" execution: > > Source ----- > (spill for pausing) ---> (update sink) > \ > ------- > (insert) > > I don't have a lot of practical experience with RDBMS, but I guess updates > are slower because an index lookup + update is necessary. Maybe optimizing > the database configuration / schema / indexes is more promising. I think > its indeed much nicer to avoid any unnecessary steps in Flink. > > Did you do any "microbenchmarks" for the update and insert part? I guess > that would help a lot to understand the impact of certain index structures, > batching sizes, or database drivers. > > Regards, > Robert > > > > > On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode < > maximilian.b...@tngtech.com> wrote: > >> Hi everyone, >> >> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them >> (doing a database update) is performing slower than the other one (an >> insert). The job as a whole is also slow as upstream operators are slowed >> down due to backpressure. I am able to speed up the whole job by >> introducing an a priori unnecessary .distinct(), which of course blocks >> downstream execution of the slow sink, which in turn seems to be able to >> execute faster when given all data at once. >> >> Any ideas what is going on here? Is there something I can do without >> introducing unnecessary computation steps? >> >> Cheers, >> Max >> — >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com * 0176 >> 1000 75 50 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> >> > >