I did the following test and it inserted data correctly, but when I try to pull 
the data it does not arrive.
        Pipeline p = Pipeline.create(options);

        Coder<String> utf8Coder = StringUtf8Coder.of();
        Coder<Map<String, String>> mapCoder = MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
        Coder<KV<String, Map<String, String>>> kvCoder = KvCoder.of(utf8Coder, 
mapCoder);

        TestStream<KV<String, Map<String, String>>> testStream = 
TestStream.create(kvCoder)
                .addElements(
                        TimestampedValue.of(KV.of("event0", 
Collections.singletonMap("test", "test")), new Instant(0 * 1000)),
                        TimestampedValue.of(KV.of("event1", 
Collections.singletonMap("test", "test")), new Instant(5 * 1000)),
                        TimestampedValue.of(KV.of("event2", 
Collections.singletonMap("test", "test")), new Instant(10 * 1000)),
                        TimestampedValue.of(KV.of("event3", 
Collections.singletonMap("test", "test")), new Instant(15 * 1000)))
                .advanceWatermarkTo(new Instant(20 * 1000))
                .advanceWatermarkToInfinity();
        PCollection<KV<String, Map<String, String>>> simulatedPubsubEvents = 
p.apply("TestStream", testStream);
        PCollection<String> result = p
                //.apply("Pubsub", 
PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
 options.getProjectId(), subscription)))
                .apply("TestStream", testStream)
                //.apply("Transform", ParDo.of(new MyTransformer()))
                .apply("Transform", ParDo.of(new NewTransform()))
                .apply("Windowing", 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                        .triggering(AfterWatermark.pastEndOfWindow()
                                
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
                        .withAllowedLateness(Duration.standardMinutes(1))
                        .discardingFiredPanes());

        PCollection<Void> insert = result.apply("Inserting",
                JdbcIO.<String>write()
                        
.withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
                        .withStatement("INSERT INTO person (first_name, 
last_name) VALUES (?, 'doe')")
                        .withPreparedStatementSetter((element, 
preparedStatement) -> {
                            log.info("Preparing statement to insert");
                            preparedStatement.setString(1, element);
                        })
                        .withResults()
        );
        result.apply(Wait.on(insert))
                .apply("Selecting", new SomeTransform())
                .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
        p.run();
As far as I can see I have to develop with TestStream and in production use 
PubSub? 
is a very complicated approach in my opinion.

------- Original Message -------
On Saturday, April 22nd, 2023 at 1:39 PM, Reuven Lax via user 
<user@beam.apache.org> wrote:


> Oh - in that case it's possible that the problem may be the direct runner's 
> implementation of the pubsub source - especially the watermark. For a 
> direct-runner test, I recommend using TestStream (which allows you to advance 
> the watermark manually, so you can test windowing).
> 
> On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:
> 
> > I'm developing with direct runner. but should go to dataflow when deployed.
> > 
> > 
> > -------- Original Message --------
> > On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
> > 
> > > 
> > > What runner are you using to run this pipeline?
> > > 
> > > On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> 
> > > wrote:
> > > 
> > > > Same result:
> > > > PCollection<String> result = p
> > > > .apply("Pubsub", 
> > > > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> > > >  options.getProjectId(), subscription)))
> > > > .apply("Transform", ParDo.of(new MyTransformer()))
> > > > .apply("Windowing", 
> > > > Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
> > > > .triggering(AfterWatermark.pastEndOfWindow()
> > > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
> > > > .withAllowedLateness(Duration.standardMinutes(1))
> > > > .discardingFiredPanes());
> > > > 
> > > > PCollection<Void> insert = result.apply("Inserting",
> > > > JdbcIO.<String>write()
> > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 
> > > > 'doe')")
> > > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > > log.info("Preparing statement to insert");
> > > > preparedStatement.setString(1, element);
> > > > })
> > > > .withResults()
> > > > );
> > > > result.apply(Wait.on(insert))
> > > > .apply("Selecting", new SomeTransform())
> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > p.run();
> > > > 
> > > > updated the github repo as wqell.
> > > > 
> > > > ------- Original Message -------
> > > > On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user 
> > > > <user@beam.apache.org> wrote:
> > > > 
> > > > 
> > > > > The other problem you have here is that you have not set a window. 
> > > > > Wait.on waits for the end of the current window before triggering. 
> > > > > The default Window is the GlobalWindow, so as written Wait.on will 
> > > > > wait for the end of time (or until you drain the pipeline, which will 
> > > > > also trigger the GlobalWindow).
> > > > > Try adding a 1-minute fixed window to the results you read from 
> > > > > PubSub.
> > > > >
> > > > > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> 
> > > > > wrote:
> > > > >
> > > > > > writeVoid() and write() plus withResults() return the same 
> > > > > > PCollection<Void> AFAIK. In any case i updated the code and same 
> > > > > > thing happens
> > > > > >
> > > > > > PCollection<String> result = p.
> > > > > > apply("Pubsub", 
> > > > > > PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> > > > > >  options.getProjectId(), subscription)))
> > > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > > >
> > > > > > PCollection<Void> insert = result.apply("Inserting",
> > > > > > JdbcIO.<String>write()
> > > > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES 
> > > > > > (?, 'doe')")
> > > > > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > > > > log.info("Preparing statement to insert");
> > > > > > preparedStatement.setString(1, element);
> > > > > > })
> > > > > > .withResults()
> > > > > > );
> > > > > > result.apply(Wait.on(insert))
> > > > > > .apply("Selecting", new SomeTransform())
> > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > > > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > > > > >
> > > > > > ------- Original Message -------
> > > > > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user 
> > > > > > <user@beam.apache.org> wrote:
> > > > > >
> > > > > >
> > > > > > > I believe you have to call withResults() on the JdbcIO transform 
> > > > > > > in order for this to work.
> > > > > > >
> > > > > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar 
> > > > > > > <jcuz...@protonmail.com> wrote:
> > > > > > >
> > > > > > > > I hope you all are doing well. I am facing an issue with an 
> > > > > > > > Apache Beam pipeline that gets stuck indefinitely when using 
> > > > > > > > the Wait.on transform alongside JdbcIO. Here's a simplified 
> > > > > > > > version of my code, focusing on the relevant parts:
> > > > > > > >
> > > > > > > > PCollection<String> result = p.
> > > > > > > > apply("Pubsub", 
> > > > > > > > PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > > > > >
> > > > > > > > PCollection<Void> insert = result.apply("Inserting",
> > > > > > > > JdbcIO.<String>writeVoid()
> > > > > > > > .withDataSourceProviderFn(/*...*/)
> > > > > > > > .withStatement(/*...*/)
> > > > > > > > .withPreparedStatementSetter(/*...*/)
> > > > > > > > );
> > > > > > > >
> > > > > > > > result.apply(Wait.on(insert))
> > > > > > > > .apply("Selecting", new SomeTransform())
> > > > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > > > > > p.run();
> > > > > > > >
> > > > > > > > In the code, I'm using the Wait.on transform to make the 
> > > > > > > > pipeline wait until the insert transform (which uses JdbcIO to 
> > > > > > > > write data) is completed before executing the next steps. 
> > > > > > > > However, the pipeline gets stuck and doesn't progress further.
> > > > > > > >
> > > > > > > > I've tried adding logging messages in my transforms to track 
> > > > > > > > the progress and identify where it's getting stuck, but I 
> > > > > > > > haven't been able to pinpoint the issue. I've searched for 
> > > > > > > > solutions online, but none of them provided a successful 
> > > > > > > > resolution for my problem.
> > > > > > > >
> > > > > > > > Can anyone provide any insights or suggestions on how to debug 
> > > > > > > > and resolve this issue involving Wait.on and JdbcIO in my 
> > > > > > > > Apache Beam pipeline?
> > > > > > > >
> > > > > > > > You can find the sample code at: 
> > > > > > > > https://github.com/j1cs/app-beam
> > > > > > > >
> > > > > > > > Thank you for your help and support.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Juan Cuzmar.

Reply via email to