Hi Wei, Thanks for details! Yes, defentively let’s chat about this in the related Jira issue.
— Alexey > On 27 Jan 2022, at 00:13, Wei Hsia ☁ <[email protected]> wrote: > > Hi Alexey, > > Thanks Cham! > > Sorry, i've been working on this in a vacuum, my apologies. > > My approach was slightly different, rather than write().withResults() - we > were taking the approach of another (which is not as ideal but the least > intrusive) creating another class WriteRecordsWithOutput and having > WriteRecords delegate to the new class. > It's the jira issue you attached to yours (I didn't really follow protocols > as I was just starting to venture into this). > Partially it's because having the ProducerRecord would be beneficial for the > use case and changing the signature on the public class WriteRecords can't be > an overnight change. > > Happy to chat about it - I'd guess I'm halfway done but if you've got > something I'd love to chat about it. > > Thanks, > > Wei > > > > > Wei Hsia > Customer Engineer, Analytics Specialist > Google Cloud > [email protected] <mailto:[email protected]> > 949.794.2004 > > > On Wed, Jan 26, 2022 at 2:22 PM Alexey Romanenko <[email protected] > <mailto:[email protected]>> wrote: > Thanks Cham but actually we already had a similar open issue for this [1] and > I’m currently working on that one. Also, I created an umbrella Jira for such > tasks for all other Java SDK IOs since I believe this behaviour can be quite > demanded in general [2] > > I’d be happy to discuss it in more details. > > — > Alexey > > [1] https://issues.apache.org/jira/browse/BEAM-13298 > <https://issues.apache.org/jira/browse/BEAM-13298> > [2] https://issues.apache.org/jira/browse/BEAM-13584 > <https://issues.apache.org/jira/browse/BEAM-13584> > > > > >> On 26 Jan 2022, at 19:38, Chamikara Jayalath <[email protected] >> <mailto:[email protected]>> wrote: >> >> Yeah, I think Wait transform will not work here as well. We have to replace >> Kafka write transform's PDone output with a proper write result. I think >> +Wei Hsia <mailto:[email protected]> has been looking into doing something >> like this. Created https://issues.apache.org/jira/browse/BEAM-13748 >> <https://issues.apache.org/jira/browse/BEAM-13748> for tracking. >> >> Thanks, >> Cham >> >> On Wed, Jan 26, 2022 at 10:32 AM Yomal de Silva <[email protected] >> <mailto:[email protected]>> wrote: >> Hi Alexey, >> Thank you for your reply. Yes, that's the use case here. >> >> Tried the approach that you have suggested, but for the window to get >> triggered shouldn't we apply some transform like GroupByKey? or else the >> window is ignored right? As we dont have such a transform applied, we did >> not observe any windowing behavior in the pipeline. >> >> On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko <[email protected] >> <mailto:[email protected]>> wrote: >> Hi Yomal de Silva, >> >> IIUC, you need to pass downstream the records only once they were stored in >> DB with JdbcIO, don’t you? And your pipeline is unbounded. >> >> So, why not to split into windows your input data PCollection (if it was not >> done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is >> exactly a case for which it was initially developed. >> >> — >> Alexey >> >> >> > On 26 Jan 2022, at 10:16, Yomal de Silva <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi beam team, >> > I have been working on a data pipeline which consumes data from an SFTP >> > location and the data is passed through multiple transforms and finally >> > published to a kafka topic. During this we need to maintain the state of >> > the file, so we write to the database during the pipeline. >> > We came across that JDBCIo write is only considered as a sink and it >> > returns a PDone. But for our requirement we need to pass down the elements >> > that were persisted. >> > >> > Using writeVoid is also not useful since this is an unbounded stream and >> > we cannot use Wait.On with that without using windows and triggers. >> > >> > Read file --> write the file state in db --> enrich data --> publish to >> > kafka --> update file state >> > >> > The above is the basic requirement from the pipeline. Any suggestions? >> > >> > Thank you >> >
