Hi Wei,

Thanks for details! Yes, defentively let’s chat about this in the related Jira 
issue.

—
Alexey

> On 27 Jan 2022, at 00:13, Wei Hsia ☁ <[email protected]> wrote:
> 
> Hi Alexey, 
> 
> Thanks Cham! 
> 
> Sorry, i've been working on this in a vacuum, my apologies. 
> 
> My approach was slightly different, rather than write().withResults() - we 
> were taking the approach of another (which is not as ideal but the least 
> intrusive) creating another class WriteRecordsWithOutput and having 
> WriteRecords delegate to the new class. 
> It's the jira issue you attached to yours (I didn't really follow protocols 
> as I was just starting to venture into this). 
> Partially it's because having the ProducerRecord would be beneficial for the 
> use case and changing the signature on the public class WriteRecords can't be 
> an overnight change. 
> 
> Happy to chat about it - I'd guess I'm halfway done but if you've got 
> something I'd love to chat about it. 
> 
> Thanks,
> 
> Wei
> 
> 
> 
> 
> Wei Hsia
> Customer Engineer, Analytics Specialist
> Google Cloud
> [email protected] <mailto:[email protected]>
> 949.794.2004
> 
> 
> On Wed, Jan 26, 2022 at 2:22 PM Alexey Romanenko <[email protected] 
> <mailto:[email protected]>> wrote:
> Thanks Cham but actually we already had a similar open issue for this [1] and 
> I’m currently working on that one. Also, I created an umbrella Jira for such 
> tasks for all other Java SDK IOs since I believe this behaviour can be quite 
> demanded in general [2] 
> 
> I’d be happy to discuss it in more details.
> 
> —
> Alexey
> 
> [1] https://issues.apache.org/jira/browse/BEAM-13298 
> <https://issues.apache.org/jira/browse/BEAM-13298>
> [2] https://issues.apache.org/jira/browse/BEAM-13584 
> <https://issues.apache.org/jira/browse/BEAM-13584>
> 
> 
> 
> 
>> On 26 Jan 2022, at 19:38, Chamikara Jayalath <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Yeah, I think Wait transform will not work here as well. We have to replace 
>> Kafka write transform's PDone output with a proper write result. I think 
>> +Wei Hsia <mailto:[email protected]> has been looking into doing something 
>> like this. Created https://issues.apache.org/jira/browse/BEAM-13748 
>> <https://issues.apache.org/jira/browse/BEAM-13748> for tracking.
>> 
>> Thanks,
>> Cham
>> 
>> On Wed, Jan 26, 2022 at 10:32 AM Yomal de Silva <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi Alexey,
>> Thank you for your reply. Yes, that's the use case here. 
>> 
>> Tried the approach that you have suggested, but for the window to get 
>> triggered shouldn't we apply some transform like GroupByKey? or else the 
>> window is ignored right? As we dont have such a transform applied, we did 
>> not observe any windowing behavior in the pipeline. 
>> 
>> On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi Yomal de Silva,
>> 
>> IIUC, you need to pass downstream the records only once they were stored in 
>> DB with JdbcIO, don’t you? And your pipeline is unbounded. 
>> 
>> So, why not to split into windows your input data PCollection (if it was not 
>> done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is 
>> exactly a case for which it was initially developed.
>> 
>> —
>> Alexey
>> 
>> 
>> > On 26 Jan 2022, at 10:16, Yomal de Silva <[email protected] 
>> > <mailto:[email protected]>> wrote:
>> > 
>> > Hi beam team,
>> > I have been working on a data pipeline which consumes data from an SFTP 
>> > location and the data is passed through multiple transforms and finally 
>> > published to a kafka topic. During this we need to maintain the state of 
>> > the file, so we write to the database during the pipeline. 
>> > We came across that JDBCIo write is only considered as a sink and it 
>> > returns a PDone. But for our requirement we need to pass down the elements 
>> > that were persisted.
>> > 
>> > Using writeVoid is also not useful since this is an unbounded stream and 
>> > we cannot use Wait.On with that without using windows and triggers. 
>> > 
>> > Read file --> write the file state in db --> enrich data --> publish to 
>> > kafka --> update file state
>> > 
>> > The above is the basic requirement from the pipeline. Any suggestions?
>> > 
>> > Thank you
>> 
> 

Reply via email to