writeVoid() and write() plus withResults() return the same PCollection<Void> 
AFAIK. In any case i updated the code and same thing happens

 PCollection<String> result = p.
                apply("Pubsub", 
PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
 options.getProjectId(), subscription)))
                .apply("Transform", ParDo.of(new MyTransformer()));

        PCollection<Void> insert = result.apply("Inserting",
                JdbcIO.<String>write()
                        
.withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
                        .withStatement("INSERT INTO person (first_name, 
last_name) VALUES (?, 'doe')")
                        .withPreparedStatementSetter((element, 
preparedStatement) -> {
                            log.info("Preparing statement to insert");
                            preparedStatement.setString(1, element);
                        })
                        .withResults()
        );
        result.apply(Wait.on(insert))
                .apply("Selecting", new SomeTransform())
                .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63

------- Original Message -------
On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user 
<user@beam.apache.org> wrote:


> I believe you have to call withResults() on the JdbcIO transform in order for 
> this to work.
> 
> On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com> wrote:
> 
> > I hope you all are doing well. I am facing an issue with an Apache Beam 
> > pipeline that gets stuck indefinitely when using the Wait.on transform 
> > alongside JdbcIO. Here's a simplified version of my code, focusing on the 
> > relevant parts:
> > 
> > PCollection<String> result = p.
> > apply("Pubsub", 
> > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > .apply("Transform", ParDo.of(new MyTransformer()));
> > 
> > PCollection<Void> insert = result.apply("Inserting",
> > JdbcIO.<String>writeVoid()
> > .withDataSourceProviderFn(/*...*/)
> > .withStatement(/*...*/)
> > .withPreparedStatementSetter(/*...*/)
> > );
> > 
> > result.apply(Wait.on(insert))
> > .apply("Selecting", new SomeTransform())
> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > p.run();
> > 
> > In the code, I'm using the Wait.on transform to make the pipeline wait 
> > until the insert transform (which uses JdbcIO to write data) is completed 
> > before executing the next steps. However, the pipeline gets stuck and 
> > doesn't progress further.
> > 
> > I've tried adding logging messages in my transforms to track the progress 
> > and identify where it's getting stuck, but I haven't been able to pinpoint 
> > the issue. I've searched for solutions online, but none of them provided a 
> > successful resolution for my problem.
> > 
> > Can anyone provide any insights or suggestions on how to debug and resolve 
> > this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > 
> > You can find the sample code at: https://github.com/j1cs/app-beam
> > 
> > Thank you for your help and support.
> > 
> > Best regards,
> > 
> > Juan Cuzmar.

Reply via email to