What runner are you using to run this pipeline?

On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:

> Same result:
> PCollection<String> result = p
>                 .apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
>                 .apply("Transform", ParDo.of(new MyTransformer()))
>                 .apply("Windowing",
> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>                         .triggering(AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>                         .withAllowedLateness(Duration.standardMinutes(1))
>                         .discardingFiredPanes());
>
>         PCollection<Void> insert = result.apply("Inserting",
>                 JdbcIO.<String>write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>                         .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
>                         .withPreparedStatementSetter((element,
> preparedStatement) -> {
>                             log.info("Preparing statement to insert");
>                             preparedStatement.setString(1, element);
>                         })
>                         .withResults()
>         );
>         result.apply(Wait.on(insert))
>                 .apply("Selecting", new SomeTransform())
>                 .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>         p.run();
>
> updated the github repo as wqell.
>
> ------- Original Message -------
> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > The other problem you have here is that you have not set a window.
> Wait.on waits for the end of the current window before triggering. The
> default Window is the GlobalWindow, so as written Wait.on will wait for the
> end of time (or until you drain the pipeline, which will also trigger the
> GlobalWindow).
> > Try adding a 1-minute fixed window to the results you read from PubSub.
> >
> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com>
> wrote:
> >
> > > writeVoid() and write() plus withResults() return the same
> PCollection<Void> AFAIK. In any case i updated the code and same thing
> happens
> > >
> > > PCollection<String> result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection<Void> insert = result.apply("Inserting",
> > > JdbcIO.<String>write()
> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
> 'doe')")
> > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > log.info("Preparing statement to insert");
> > > preparedStatement.setString(1, element);
> > > })
> > > .withResults()
> > > );
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > >
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > >
> > > ------- Original Message -------
> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
> > >
> > >
> > > > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> > > >
> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com>
> wrote:
> > > >
> > > > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > > > >
> > > > > PCollection<String> result = p.
> > > > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > >
> > > > > PCollection<Void> insert = result.apply("Inserting",
> > > > > JdbcIO.<String>writeVoid()
> > > > > .withDataSourceProviderFn(/*...*/)
> > > > > .withStatement(/*...*/)
> > > > > .withPreparedStatementSetter(/*...*/)
> > > > > );
> > > > >
> > > > > result.apply(Wait.on(insert))
> > > > > .apply("Selecting", new SomeTransform())
> > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > > p.run();
> > > > >
> > > > > In the code, I'm using the Wait.on transform to make the pipeline
> wait until the insert transform (which uses JdbcIO to write data) is
> completed before executing the next steps. However, the pipeline gets stuck
> and doesn't progress further.
> > > > >
> > > > > I've tried adding logging messages in my transforms to track the
> progress and identify where it's getting stuck, but I haven't been able to
> pinpoint the issue. I've searched for solutions online, but none of them
> provided a successful resolution for my problem.
> > > > >
> > > > > Can anyone provide any insights or suggestions on how to debug and
> resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > > > >
> > > > > You can find the sample code at: https://github.com/j1cs/app-beam
> > > > >
> > > > > Thank you for your help and support.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Juan Cuzmar.
>

Reply via email to