Hmm, do you mean that if you have a windowed PCollection as an input for "JdbcIO.write().withResults()" (“signal" transform) and then you apply "Wait.on(signal)" then it doesn’t work as expected [1]?
[1] https://github.com/apache/beam/blob/3c83a3fa0cf68e69d484d048ac8e96de37fc12d5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java#L58 > On 26 Jan 2022, at 19:31, Yomal de Silva <yomal.prav...@gmail.com> wrote: > > Hi Alexey, > Thank you for your reply. Yes, that's the use case here. > > Tried the approach that you have suggested, but for the window to get > triggered shouldn't we apply some transform like GroupByKey? or else the > window is ignored right? As we dont have such a transform applied, we did not > observe any windowing behavior in the pipeline. > > On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Hi Yomal de Silva, > > IIUC, you need to pass downstream the records only once they were stored in > DB with JdbcIO, don’t you? And your pipeline is unbounded. > > So, why not to split into windows your input data PCollection (if it was not > done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is > exactly a case for which it was initially developed. > > — > Alexey > > > > On 26 Jan 2022, at 10:16, Yomal de Silva <yomal.prav...@gmail.com > > <mailto:yomal.prav...@gmail.com>> wrote: > > > > Hi beam team, > > I have been working on a data pipeline which consumes data from an SFTP > > location and the data is passed through multiple transforms and finally > > published to a kafka topic. During this we need to maintain the state of > > the file, so we write to the database during the pipeline. > > We came across that JDBCIo write is only considered as a sink and it > > returns a PDone. But for our requirement we need to pass down the elements > > that were persisted. > > > > Using writeVoid is also not useful since this is an unbounded stream and we > > cannot use Wait.On with that without using windows and triggers. > > > > Read file --> write the file state in db --> enrich data --> publish to > > kafka --> update file state > > > > The above is the basic requirement from the pipeline. Any suggestions? > > > > Thank you >