writeVoid() and write() plus withResults() return the same PCollection<Void> AFAIK. In any case i updated the code and same thing happens
PCollection<String> result = p. apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription))) .apply("Transform", ParDo.of(new MyTransformer())); PCollection<Void> insert = result.apply("Inserting", JdbcIO.<String>write() .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')") .withPreparedStatementSetter((element, preparedStatement) -> { log.info("Preparing statement to insert"); preparedStatement.setString(1, element); }) .withResults() ); result.apply(Wait.on(insert)) .apply("Selecting", new SomeTransform()) .apply("PubsubMessaging", ParDo.of(new NextTransformer())); https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63 ------- Original Message ------- On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <user@beam.apache.org> wrote: > I believe you have to call withResults() on the JdbcIO transform in order for > this to work. > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com> wrote: > > > I hope you all are doing well. I am facing an issue with an Apache Beam > > pipeline that gets stuck indefinitely when using the Wait.on transform > > alongside JdbcIO. Here's a simplified version of my code, focusing on the > > relevant parts: > > > > PCollection<String> result = p. > > apply("Pubsub", > > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/)) > > .apply("Transform", ParDo.of(new MyTransformer())); > > > > PCollection<Void> insert = result.apply("Inserting", > > JdbcIO.<String>writeVoid() > > .withDataSourceProviderFn(/*...*/) > > .withStatement(/*...*/) > > .withPreparedStatementSetter(/*...*/) > > ); > > > > result.apply(Wait.on(insert)) > > .apply("Selecting", new SomeTransform()) > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > p.run(); > > > > In the code, I'm using the Wait.on transform to make the pipeline wait > > until the insert transform (which uses JdbcIO to write data) is completed > > before executing the next steps. However, the pipeline gets stuck and > > doesn't progress further. > > > > I've tried adding logging messages in my transforms to track the progress > > and identify where it's getting stuck, but I haven't been able to pinpoint > > the issue. I've searched for solutions online, but none of them provided a > > successful resolution for my problem. > > > > Can anyone provide any insights or suggestions on how to debug and resolve > > this issue involving Wait.on and JdbcIO in my Apache Beam pipeline? > > > > You can find the sample code at: https://github.com/j1cs/app-beam > > > > Thank you for your help and support. > > > > Best regards, > > > > Juan Cuzmar.