I see. if you don't mind could you give me an example? i am not very knowledgeable in apache beam.
-------- Original Message -------- On Apr 22, 2023, 13:39, Reuven Lax via user wrote: > Oh - in that case it's possible that the problem may be the direct runner's > implementation of the pubsub source - especially the watermark. For a > direct-runner test, I recommend using TestStream (which allows you to advance > the watermark manually, so you can test windowing). > > On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jcuz...@protonmail.com> wrote: > >> I'm developing with direct runner. but should go to dataflow when deployed. >> >> -------- Original Message -------- >> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote: >> >>> What runner are you using to run this pipeline? >>> >>> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> wrote: >>> >>>> Same result: >>>> PCollection<String> result = p >>>> .apply("Pubsub", >>>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", >>>> options.getProjectId(), subscription))) >>>> .apply("Transform", ParDo.of(new MyTransformer())) >>>> .apply("Windowing", >>>> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) >>>> .triggering(AfterWatermark.pastEndOfWindow() >>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))) >>>> .withAllowedLateness(Duration.standardMinutes(1)) >>>> .discardingFiredPanes()); >>>> >>>> PCollection<Void> insert = result.apply("Inserting", >>>> JdbcIO.<String>write() >>>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) >>>> .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, >>>> 'doe')") >>>> .withPreparedStatementSetter((element, preparedStatement) -> { >>>> log.info("Preparing statement to insert"); >>>> preparedStatement.setString(1, element); >>>> }) >>>> .withResults() >>>> ); >>>> result.apply(Wait.on(insert)) >>>> .apply("Selecting", new SomeTransform()) >>>> .apply("PubsubMessaging", ParDo.of(new NextTransformer())); >>>> p.run(); >>>> >>>> updated the github repo as wqell. >>>> >>>> ------- Original Message ------- >>>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user >>>> <user@beam.apache.org> wrote: >>>> >>>>> The other problem you have here is that you have not set a window. >>>>> Wait.on waits for the end of the current window before triggering. The >>>>> default Window is the GlobalWindow, so as written Wait.on will wait for >>>>> the end of time (or until you drain the pipeline, which will also trigger >>>>> the GlobalWindow). >>>>> Try adding a 1-minute fixed window to the results you read from PubSub. >>>>> >>>>> On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> >>>>> wrote: >>>>> >>>>> > writeVoid() and write() plus withResults() return the same >>>>> > PCollection<Void> AFAIK. In any case i updated the code and same thing >>>>> > happens >>>>> > >>>>> > PCollection<String> result = p. >>>>> > apply("Pubsub", >>>>> > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", >>>>> > options.getProjectId(), subscription))) >>>>> > .apply("Transform", ParDo.of(new MyTransformer())); >>>>> > >>>>> > PCollection<Void> insert = result.apply("Inserting", >>>>> > JdbcIO.<String>write() >>>>> > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) >>>>> > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, >>>>> > 'doe')") >>>>> > .withPreparedStatementSetter((element, preparedStatement) -> { >>>>> > log.info("Preparing statement to insert"); >>>>> > preparedStatement.setString(1, element); >>>>> > }) >>>>> > .withResults() >>>>> > ); >>>>> > result.apply(Wait.on(insert)) >>>>> > .apply("Selecting", new SomeTransform()) >>>>> > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); >>>>> > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63 >>>>> > >>>>> > ------- Original Message ------- >>>>> > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user >>>>> > <user@beam.apache.org> wrote: >>>>> > >>>>> > >>>>> > > I believe you have to call withResults() on the JdbcIO transform in >>>>> > > order for this to work. >>>>> > > >>>>> > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jcuz...@protonmail.com> >>>>> > > wrote: >>>>> > > >>>>> > > > I hope you all are doing well. I am facing an issue with an Apache >>>>> > > > Beam pipeline that gets stuck indefinitely when using the Wait.on >>>>> > > > transform alongside JdbcIO. Here's a simplified version of my code, >>>>> > > > focusing on the relevant parts: >>>>> > > > >>>>> > > > PCollection<String> result = p. >>>>> > > > apply("Pubsub", >>>>> > > > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/)) >>>>> > > > .apply("Transform", ParDo.of(new MyTransformer())); >>>>> > > > >>>>> > > > PCollection<Void> insert = result.apply("Inserting", >>>>> > > > JdbcIO.<String>writeVoid() >>>>> > > > .withDataSourceProviderFn(/*...*/) >>>>> > > > .withStatement(/*...*/) >>>>> > > > .withPreparedStatementSetter(/*...*/) >>>>> > > > ); >>>>> > > > >>>>> > > > result.apply(Wait.on(insert)) >>>>> > > > .apply("Selecting", new SomeTransform()) >>>>> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); >>>>> > > > p.run(); >>>>> > > > >>>>> > > > In the code, I'm using the Wait.on transform to make the pipeline >>>>> > > > wait until the insert transform (which uses JdbcIO to write data) >>>>> > > > is completed before executing the next steps. However, the pipeline >>>>> > > > gets stuck and doesn't progress further. >>>>> > > > >>>>> > > > I've tried adding logging messages in my transforms to track the >>>>> > > > progress and identify where it's getting stuck, but I haven't been >>>>> > > > able to pinpoint the issue. I've searched for solutions online, but >>>>> > > > none of them provided a successful resolution for my problem. >>>>> > > > >>>>> > > > Can anyone provide any insights or suggestions on how to debug and >>>>> > > > resolve this issue involving Wait.on and JdbcIO in my Apache Beam >>>>> > > > pipeline? >>>>> > > > >>>>> > > > You can find the sample code at: https://github.com/j1cs/app-beam >>>>> > > > >>>>> > > > Thank you for your help and support. >>>>> > > > >>>>> > > > Best regards, >>>>> > > > >>>>> > > > Juan Cuzmar.