Yes, it is needed for KafkaIo as well. Thank you for creating the ticket. For such JDBC operations are there any suggestions on a way forward? Currently trying out an approach using JDBC template inside a DoFn. This creates a complexity when performing batch operations.
On Thu, Jan 27, 2022 at 12:09 AM Chamikara Jayalath <[email protected]> wrote: > Yeah, I think Wait transform will not work here as well. We have to > replace Kafka write transform's PDone output with a proper write result. I > think +Wei Hsia <[email protected]> has been looking into doing > something like this. Created > https://issues.apache.org/jira/browse/BEAM-13748 for tracking. > > Thanks, > Cham > > On Wed, Jan 26, 2022 at 10:32 AM Yomal de Silva <[email protected]> > wrote: > >> Hi Alexey, >> Thank you for your reply. Yes, that's the use case here. >> >> Tried the approach that you have suggested, but for the window to get >> triggered shouldn't we apply some transform like GroupByKey? or else the >> window is ignored right? As we dont have such a transform applied, we did >> not observe any windowing behavior in the pipeline. >> >> On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko < >> [email protected]> wrote: >> >>> Hi Yomal de Silva, >>> >>> IIUC, you need to pass downstream the records only once they were stored >>> in DB with JdbcIO, don’t you? And your pipeline is unbounded. >>> >>> So, why not to split into windows your input data PCollection (if it was >>> not done upstream) and use Wait.on() with JdbcIO.write().withResults()? >>> This is exactly a case for which it was initially developed. >>> >>> — >>> Alexey >>> >>> >>> > On 26 Jan 2022, at 10:16, Yomal de Silva <[email protected]> >>> wrote: >>> > >>> > Hi beam team, >>> > I have been working on a data pipeline which consumes data from an >>> SFTP location and the data is passed through multiple transforms and >>> finally published to a kafka topic. During this we need to maintain the >>> state of the file, so we write to the database during the pipeline. >>> > We came across that JDBCIo write is only considered as a sink and it >>> returns a PDone. But for our requirement we need to pass down the elements >>> that were persisted. >>> > >>> > Using writeVoid is also not useful since this is an unbounded stream >>> and we cannot use Wait.On with that without using windows and triggers. >>> > >>> > Read file --> write the file state in db --> enrich data --> publish >>> to kafka --> update file state >>> > >>> > The above is the basic requirement from the pipeline. Any suggestions? >>> > >>> > Thank you >>> >>>
