What runner are you using to run this pipeline? On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:
> Same result: > PCollection<String> result = p > .apply("Pubsub", > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", > options.getProjectId(), subscription))) > .apply("Transform", ParDo.of(new MyTransformer())) > .apply("Windowing", > Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) > .triggering(AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))) > .withAllowedLateness(Duration.standardMinutes(1)) > .discardingFiredPanes()); > > PCollection<Void> insert = result.apply("Inserting", > JdbcIO.<String>write() > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) > .withStatement("INSERT INTO person (first_name, > last_name) VALUES (?, 'doe')") > .withPreparedStatementSetter((element, > preparedStatement) -> { > log.info("Preparing statement to insert"); > preparedStatement.setString(1, element); > }) > .withResults() > ); > result.apply(Wait.on(insert)) > .apply("Selecting", new SomeTransform()) > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > p.run(); > > updated the github repo as wqell. > > ------- Original Message ------- > On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user < > user@beam.apache.org> wrote: > > > > The other problem you have here is that you have not set a window. > Wait.on waits for the end of the current window before triggering. The > default Window is the GlobalWindow, so as written Wait.on will wait for the > end of time (or until you drain the pipeline, which will also trigger the > GlobalWindow). > > Try adding a 1-minute fixed window to the results you read from PubSub. > > > > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> > wrote: > > > > > writeVoid() and write() plus withResults() return the same > PCollection<Void> AFAIK. In any case i updated the code and same thing > happens > > > > > > PCollection<String> result = p. > > > apply("Pubsub", > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", > options.getProjectId(), subscription))) > > > .apply("Transform", ParDo.of(new MyTransformer())); > > > > > > PCollection<Void> insert = result.apply("Inserting", > > > JdbcIO.<String>write() > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, > 'doe')") > > > .withPreparedStatementSetter((element, preparedStatement) -> { > > > log.info("Preparing statement to insert"); > > > preparedStatement.setString(1, element); > > > }) > > > .withResults() > > > ); > > > result.apply(Wait.on(insert)) > > > .apply("Selecting", new SomeTransform()) > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > > > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63 > > > > > > ------- Original Message ------- > > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user < > user@beam.apache.org> wrote: > > > > > > > > > > I believe you have to call withResults() on the JdbcIO transform in > order for this to work. > > > > > > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com> > wrote: > > > > > > > > > I hope you all are doing well. I am facing an issue with an Apache > Beam pipeline that gets stuck indefinitely when using the Wait.on transform > alongside JdbcIO. Here's a simplified version of my code, focusing on the > relevant parts: > > > > > > > > > > PCollection<String> result = p. > > > > > apply("Pubsub", > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/)) > > > > > .apply("Transform", ParDo.of(new MyTransformer())); > > > > > > > > > > PCollection<Void> insert = result.apply("Inserting", > > > > > JdbcIO.<String>writeVoid() > > > > > .withDataSourceProviderFn(/*...*/) > > > > > .withStatement(/*...*/) > > > > > .withPreparedStatementSetter(/*...*/) > > > > > ); > > > > > > > > > > result.apply(Wait.on(insert)) > > > > > .apply("Selecting", new SomeTransform()) > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > > > > p.run(); > > > > > > > > > > In the code, I'm using the Wait.on transform to make the pipeline > wait until the insert transform (which uses JdbcIO to write data) is > completed before executing the next steps. However, the pipeline gets stuck > and doesn't progress further. > > > > > > > > > > I've tried adding logging messages in my transforms to track the > progress and identify where it's getting stuck, but I haven't been able to > pinpoint the issue. I've searched for solutions online, but none of them > provided a successful resolution for my problem. > > > > > > > > > > Can anyone provide any insights or suggestions on how to debug and > resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline? > > > > > > > > > > You can find the sample code at: https://github.com/j1cs/app-beam > > > > > > > > > > Thank you for your help and support. > > > > > > > > > > Best regards, > > > > > > > > > > Juan Cuzmar. >