I see. if you don't mind could you give me an example? i am not very 
knowledgeable in apache beam.

-------- Original Message --------
On Apr 22, 2023, 13:39, Reuven Lax via user wrote:

> Oh - in that case it's possible that the problem may be the direct runner's 
> implementation of the pubsub source - especially the watermark. For a 
> direct-runner test, I recommend using TestStream (which allows you to advance 
> the watermark manually, so you can test windowing).
>
> On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:
>
>> I'm developing with direct runner. but should go to dataflow when deployed.
>>
>> -------- Original Message --------
>> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
>>
>>> What runner are you using to run this pipeline?
>>>
>>> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:
>>>
>>>> Same result:
>>>> PCollection<String> result = p
>>>> .apply("Pubsub", 
>>>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>>>>  options.getProjectId(), subscription)))
>>>> .apply("Transform", ParDo.of(new MyTransformer()))
>>>> .apply("Windowing", 
>>>> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>>>> .triggering(AfterWatermark.pastEndOfWindow()
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>>>> .withAllowedLateness(Duration.standardMinutes(1))
>>>> .discardingFiredPanes());
>>>>
>>>> PCollection<Void> insert = result.apply("Inserting",
>>>> JdbcIO.<String>write()
>>>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>>> .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 
>>>> 'doe')")
>>>> .withPreparedStatementSetter((element, preparedStatement) -> {
>>>> log.info("Preparing statement to insert");
>>>> preparedStatement.setString(1, element);
>>>> })
>>>> .withResults()
>>>> );
>>>> result.apply(Wait.on(insert))
>>>> .apply("Selecting", new SomeTransform())
>>>> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>>> p.run();
>>>>
>>>> updated the github repo as wqell.
>>>>
>>>> ------- Original Message -------
>>>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user 
>>>> <user@beam.apache.org> wrote:
>>>>
>>>>> The other problem you have here is that you have not set a window. 
>>>>> Wait.on waits for the end of the current window before triggering. The 
>>>>> default Window is the GlobalWindow, so as written Wait.on will wait for 
>>>>> the end of time (or until you drain the pipeline, which will also trigger 
>>>>> the GlobalWindow).
>>>>> Try adding a 1-minute fixed window to the results you read from PubSub.
>>>>>
>>>>> On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> 
>>>>> wrote:
>>>>>
>>>>> > writeVoid() and write() plus withResults() return the same 
>>>>> > PCollection<Void> AFAIK. In any case i updated the code and same thing 
>>>>> > happens
>>>>> >
>>>>> > PCollection<String> result = p.
>>>>> > apply("Pubsub", 
>>>>> > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>>>>> >  options.getProjectId(), subscription)))
>>>>> > .apply("Transform", ParDo.of(new MyTransformer()));
>>>>> >
>>>>> > PCollection<Void> insert = result.apply("Inserting",
>>>>> > JdbcIO.<String>write()
>>>>> > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>>>> > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 
>>>>> > 'doe')")
>>>>> > .withPreparedStatementSetter((element, preparedStatement) -> {
>>>>> > log.info("Preparing statement to insert");
>>>>> > preparedStatement.setString(1, element);
>>>>> > })
>>>>> > .withResults()
>>>>> > );
>>>>> > result.apply(Wait.on(insert))
>>>>> > .apply("Selecting", new SomeTransform())
>>>>> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>>>> > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>>>>> >
>>>>> > ------- Original Message -------
>>>>> > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user 
>>>>> > <user@beam.apache.org> wrote:
>>>>> >
>>>>> >
>>>>> > > I believe you have to call withResults() on the JdbcIO transform in 
>>>>> > > order for this to work.
>>>>> > >
>>>>> > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com> 
>>>>> > > wrote:
>>>>> > >
>>>>> > > > I hope you all are doing well. I am facing an issue with an Apache 
>>>>> > > > Beam pipeline that gets stuck indefinitely when using the Wait.on 
>>>>> > > > transform alongside JdbcIO. Here's a simplified version of my code, 
>>>>> > > > focusing on the relevant parts:
>>>>> > > >
>>>>> > > > PCollection<String> result = p.
>>>>> > > > apply("Pubsub", 
>>>>> > > > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>>>>> > > > .apply("Transform", ParDo.of(new MyTransformer()));
>>>>> > > >
>>>>> > > > PCollection<Void> insert = result.apply("Inserting",
>>>>> > > > JdbcIO.<String>writeVoid()
>>>>> > > > .withDataSourceProviderFn(/*...*/)
>>>>> > > > .withStatement(/*...*/)
>>>>> > > > .withPreparedStatementSetter(/*...*/)
>>>>> > > > );
>>>>> > > >
>>>>> > > > result.apply(Wait.on(insert))
>>>>> > > > .apply("Selecting", new SomeTransform())
>>>>> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>>>> > > > p.run();
>>>>> > > >
>>>>> > > > In the code, I'm using the Wait.on transform to make the pipeline 
>>>>> > > > wait until the insert transform (which uses JdbcIO to write data) 
>>>>> > > > is completed before executing the next steps. However, the pipeline 
>>>>> > > > gets stuck and doesn't progress further.
>>>>> > > >
>>>>> > > > I've tried adding logging messages in my transforms to track the 
>>>>> > > > progress and identify where it's getting stuck, but I haven't been 
>>>>> > > > able to pinpoint the issue. I've searched for solutions online, but 
>>>>> > > > none of them provided a successful resolution for my problem.
>>>>> > > >
>>>>> > > > Can anyone provide any insights or suggestions on how to debug and 
>>>>> > > > resolve this issue involving Wait.on and JdbcIO in my Apache Beam 
>>>>> > > > pipeline?
>>>>> > > >
>>>>> > > > You can find the sample code at: https://github.com/j1cs/app-beam
>>>>> > > >
>>>>> > > > Thank you for your help and support.
>>>>> > > >
>>>>> > > > Best regards,
>>>>> > > >
>>>>> > > > Juan Cuzmar.

Reply via email to