I'm developing with direct runner. but should go to dataflow when deployed.

-------- Original Message --------
On Apr 22, 2023, 13:13, Reuven Lax via user wrote:

> What runner are you using to run this pipeline?
>
> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:
>
>> Same result:
>> PCollection<String> result = p
>> .apply("Pubsub", 
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>>  options.getProjectId(), subscription)))
>> .apply("Transform", ParDo.of(new MyTransformer()))
>> .apply("Windowing", 
>> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>> .triggering(AfterWatermark.pastEndOfWindow()
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>> .withAllowedLateness(Duration.standardMinutes(1))
>> .discardingFiredPanes());
>>
>> PCollection<Void> insert = result.apply("Inserting",
>> JdbcIO.<String>write()
>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 
>> 'doe')")
>> .withPreparedStatementSetter((element, preparedStatement) -> {
>> log.info("Preparing statement to insert");
>> preparedStatement.setString(1, element);
>> })
>> .withResults()
>> );
>> result.apply(Wait.on(insert))
>> .apply("Selecting", new SomeTransform())
>> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> p.run();
>>
>> updated the github repo as wqell.
>>
>> ------- Original Message -------
>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user 
>> <user@beam.apache.org> wrote:
>>
>>> The other problem you have here is that you have not set a window. Wait.on 
>>> waits for the end of the current window before triggering. The default 
>>> Window is the GlobalWindow, so as written Wait.on will wait for the end of 
>>> time (or until you drain the pipeline, which will also trigger the 
>>> GlobalWindow).
>>> Try adding a 1-minute fixed window to the results you read from PubSub.
>>>
>>> On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:
>>>
>>> > writeVoid() and write() plus withResults() return the same 
>>> > PCollection<Void> AFAIK. In any case i updated the code and same thing 
>>> > happens
>>> >
>>> > PCollection<String> result = p.
>>> > apply("Pubsub", 
>>> > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>>> >  options.getProjectId(), subscription)))
>>> > .apply("Transform", ParDo.of(new MyTransformer()));
>>> >
>>> > PCollection<Void> insert = result.apply("Inserting",
>>> > JdbcIO.<String>write()
>>> > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>> > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 
>>> > 'doe')")
>>> > .withPreparedStatementSetter((element, preparedStatement) -> {
>>> > log.info("Preparing statement to insert");
>>> > preparedStatement.setString(1, element);
>>> > })
>>> > .withResults()
>>> > );
>>> > result.apply(Wait.on(insert))
>>> > .apply("Selecting", new SomeTransform())
>>> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>> > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>>> >
>>> > ------- Original Message -------
>>> > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user 
>>> > <user@beam.apache.org> wrote:
>>> >
>>> >
>>> > > I believe you have to call withResults() on the JdbcIO transform in 
>>> > > order for this to work.
>>> > >
>>> > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com> 
>>> > > wrote:
>>> > >
>>> > > > I hope you all are doing well. I am facing an issue with an Apache 
>>> > > > Beam pipeline that gets stuck indefinitely when using the Wait.on 
>>> > > > transform alongside JdbcIO. Here's a simplified version of my code, 
>>> > > > focusing on the relevant parts:
>>> > > >
>>> > > > PCollection<String> result = p.
>>> > > > apply("Pubsub", 
>>> > > > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>>> > > > .apply("Transform", ParDo.of(new MyTransformer()));
>>> > > >
>>> > > > PCollection<Void> insert = result.apply("Inserting",
>>> > > > JdbcIO.<String>writeVoid()
>>> > > > .withDataSourceProviderFn(/*...*/)
>>> > > > .withStatement(/*...*/)
>>> > > > .withPreparedStatementSetter(/*...*/)
>>> > > > );
>>> > > >
>>> > > > result.apply(Wait.on(insert))
>>> > > > .apply("Selecting", new SomeTransform())
>>> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>> > > > p.run();
>>> > > >
>>> > > > In the code, I'm using the Wait.on transform to make the pipeline 
>>> > > > wait until the insert transform (which uses JdbcIO to write data) is 
>>> > > > completed before executing the next steps. However, the pipeline gets 
>>> > > > stuck and doesn't progress further.
>>> > > >
>>> > > > I've tried adding logging messages in my transforms to track the 
>>> > > > progress and identify where it's getting stuck, but I haven't been 
>>> > > > able to pinpoint the issue. I've searched for solutions online, but 
>>> > > > none of them provided a successful resolution for my problem.
>>> > > >
>>> > > > Can anyone provide any insights or suggestions on how to debug and 
>>> > > > resolve this issue involving Wait.on and JdbcIO in my Apache Beam 
>>> > > > pipeline?
>>> > > >
>>> > > > You can find the sample code at: https://github.com/j1cs/app-beam
>>> > > >
>>> > > > Thank you for your help and support.
>>> > > >
>>> > > > Best regards,
>>> > > >
>>> > > > Juan Cuzmar.

Reply via email to