I did the following test and it inserted data correctly, but when I try to pull the data it does not arrive. Pipeline p = Pipeline.create(options);
Coder<String> utf8Coder = StringUtf8Coder.of(); Coder<Map<String, String>> mapCoder = MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); Coder<KV<String, Map<String, String>>> kvCoder = KvCoder.of(utf8Coder, mapCoder); TestStream<KV<String, Map<String, String>>> testStream = TestStream.create(kvCoder) .addElements( TimestampedValue.of(KV.of("event0", Collections.singletonMap("test", "test")), new Instant(0 * 1000)), TimestampedValue.of(KV.of("event1", Collections.singletonMap("test", "test")), new Instant(5 * 1000)), TimestampedValue.of(KV.of("event2", Collections.singletonMap("test", "test")), new Instant(10 * 1000)), TimestampedValue.of(KV.of("event3", Collections.singletonMap("test", "test")), new Instant(15 * 1000))) .advanceWatermarkTo(new Instant(20 * 1000)) .advanceWatermarkToInfinity(); PCollection<KV<String, Map<String, String>>> simulatedPubsubEvents = p.apply("TestStream", testStream); PCollection<String> result = p //.apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription))) .apply("TestStream", testStream) //.apply("Transform", ParDo.of(new MyTransformer())) .apply("Transform", ParDo.of(new NewTransform())) .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))) .withAllowedLateness(Duration.standardMinutes(1)) .discardingFiredPanes()); PCollection<Void> insert = result.apply("Inserting", JdbcIO.<String>write() .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')") .withPreparedStatementSetter((element, preparedStatement) -> { log.info("Preparing statement to insert"); preparedStatement.setString(1, element); }) .withResults() ); result.apply(Wait.on(insert)) .apply("Selecting", new SomeTransform()) .apply("PubsubMessaging", ParDo.of(new NextTransformer())); p.run(); As far as I can see I have to develop with TestStream and in production use PubSub? is a very complicated approach in my opinion. ------- Original Message ------- On Saturday, April 22nd, 2023 at 1:39 PM, Reuven Lax via user <user@beam.apache.org> wrote: > Oh - in that case it's possible that the problem may be the direct runner's > implementation of the pubsub source - especially the watermark. For a > direct-runner test, I recommend using TestStream (which allows you to advance > the watermark manually, so you can test windowing). > > On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jcuz...@protonmail.com> wrote: > > > I'm developing with direct runner. but should go to dataflow when deployed. > > > > > > -------- Original Message -------- > > On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote: > > > > > > > > What runner are you using to run this pipeline? > > > > > > On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> > > > wrote: > > > > > > > Same result: > > > > PCollection<String> result = p > > > > .apply("Pubsub", > > > > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", > > > > options.getProjectId(), subscription))) > > > > .apply("Transform", ParDo.of(new MyTransformer())) > > > > .apply("Windowing", > > > > Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) > > > > .triggering(AfterWatermark.pastEndOfWindow() > > > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))) > > > > .withAllowedLateness(Duration.standardMinutes(1)) > > > > .discardingFiredPanes()); > > > > > > > > PCollection<Void> insert = result.apply("Inserting", > > > > JdbcIO.<String>write() > > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) > > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, > > > > 'doe')") > > > > .withPreparedStatementSetter((element, preparedStatement) -> { > > > > log.info("Preparing statement to insert"); > > > > preparedStatement.setString(1, element); > > > > }) > > > > .withResults() > > > > ); > > > > result.apply(Wait.on(insert)) > > > > .apply("Selecting", new SomeTransform()) > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > > > p.run(); > > > > > > > > updated the github repo as wqell. > > > > > > > > ------- Original Message ------- > > > > On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user > > > > <user@beam.apache.org> wrote: > > > > > > > > > > > > > The other problem you have here is that you have not set a window. > > > > > Wait.on waits for the end of the current window before triggering. > > > > > The default Window is the GlobalWindow, so as written Wait.on will > > > > > wait for the end of time (or until you drain the pipeline, which will > > > > > also trigger the GlobalWindow). > > > > > Try adding a 1-minute fixed window to the results you read from > > > > > PubSub. > > > > > > > > > > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> > > > > > wrote: > > > > > > > > > > > writeVoid() and write() plus withResults() return the same > > > > > > PCollection<Void> AFAIK. In any case i updated the code and same > > > > > > thing happens > > > > > > > > > > > > PCollection<String> result = p. > > > > > > apply("Pubsub", > > > > > > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", > > > > > > options.getProjectId(), subscription))) > > > > > > .apply("Transform", ParDo.of(new MyTransformer())); > > > > > > > > > > > > PCollection<Void> insert = result.apply("Inserting", > > > > > > JdbcIO.<String>write() > > > > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) > > > > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES > > > > > > (?, 'doe')") > > > > > > .withPreparedStatementSetter((element, preparedStatement) -> { > > > > > > log.info("Preparing statement to insert"); > > > > > > preparedStatement.setString(1, element); > > > > > > }) > > > > > > .withResults() > > > > > > ); > > > > > > result.apply(Wait.on(insert)) > > > > > > .apply("Selecting", new SomeTransform()) > > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > > > > > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63 > > > > > > > > > > > > ------- Original Message ------- > > > > > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user > > > > > > <user@beam.apache.org> wrote: > > > > > > > > > > > > > > > > > > > I believe you have to call withResults() on the JdbcIO transform > > > > > > > in order for this to work. > > > > > > > > > > > > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar > > > > > > > <jcuz...@protonmail.com> wrote: > > > > > > > > > > > > > > > I hope you all are doing well. I am facing an issue with an > > > > > > > > Apache Beam pipeline that gets stuck indefinitely when using > > > > > > > > the Wait.on transform alongside JdbcIO. Here's a simplified > > > > > > > > version of my code, focusing on the relevant parts: > > > > > > > > > > > > > > > > PCollection<String> result = p. > > > > > > > > apply("Pubsub", > > > > > > > > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/)) > > > > > > > > .apply("Transform", ParDo.of(new MyTransformer())); > > > > > > > > > > > > > > > > PCollection<Void> insert = result.apply("Inserting", > > > > > > > > JdbcIO.<String>writeVoid() > > > > > > > > .withDataSourceProviderFn(/*...*/) > > > > > > > > .withStatement(/*...*/) > > > > > > > > .withPreparedStatementSetter(/*...*/) > > > > > > > > ); > > > > > > > > > > > > > > > > result.apply(Wait.on(insert)) > > > > > > > > .apply("Selecting", new SomeTransform()) > > > > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > > > > > > > p.run(); > > > > > > > > > > > > > > > > In the code, I'm using the Wait.on transform to make the > > > > > > > > pipeline wait until the insert transform (which uses JdbcIO to > > > > > > > > write data) is completed before executing the next steps. > > > > > > > > However, the pipeline gets stuck and doesn't progress further. > > > > > > > > > > > > > > > > I've tried adding logging messages in my transforms to track > > > > > > > > the progress and identify where it's getting stuck, but I > > > > > > > > haven't been able to pinpoint the issue. I've searched for > > > > > > > > solutions online, but none of them provided a successful > > > > > > > > resolution for my problem. > > > > > > > > > > > > > > > > Can anyone provide any insights or suggestions on how to debug > > > > > > > > and resolve this issue involving Wait.on and JdbcIO in my > > > > > > > > Apache Beam pipeline? > > > > > > > > > > > > > > > > You can find the sample code at: > > > > > > > > https://github.com/j1cs/app-beam > > > > > > > > > > > > > > > > Thank you for your help and support. > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > Juan Cuzmar.