What is the cleanest way to detect the pipeline can be cancelled? If the pipeline runs as intended I think I should stop on a "stop" message. And in case of problems I should stop on a timeout.
Do you know of an example that does this? Niels On Thu, 16 Jul 2020, 18:34 Luke Cwik, <[email protected]> wrote: > Have you tried cancelling[1] the pipeline once your condition is met? > > 1: > https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46 > > On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <[email protected]> > wrote: > >> Hi Niels, >> >> AFAICT, for that reason in KafkaIOIT [1] we use >> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a >> cardinality of input dataset. It’s not 100% faire imo, since in this case >> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use >> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this. >> >> Also, I believe that “p.run()” should be asynchronous by default [2] and >> I guess it’s blocked only for DirectRunner (perhaps in a sense of testing >> reasons). >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java >> [2] >> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline >> >> On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote: >> >> Hi, >> >> I found a way that seems to work. >> >> I have >> >> @Rule >> >> public final transient TestPipeline pipeline = TestPipeline.create(); >> >> in the test I configure PubSub to connect to the emulator >> >> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class); >> >> options.setProject(PROJECT_NAME); >> >> options.setPubsubRootUrl(getPubsubRootUrl()); >> >> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class); >> >> options.setStreaming(true); >> >> I then hook my processing to this pipeline and afterwards I do this: >> >> *pipeline >> * .getOptions() .as(DirectOptions.class) >> .setBlockOnRun(false); >> >> PipelineResult job = pipeline.run(); >> >> long waitTime = 5000; >> >> LOG.info("Waiting ... {} seconds", waitTime/1000); >> >> job.waitUntilFinish(Duration.millis(waitTime)); >> >> >> Although this works it will fail my build on a slow machine. >> >> Is this the best option? Or can I inject a "stop" message in my stream and >> let the pipeline shutdown once it sees that? >> >> >> >> >> >> >> >> Can you please indicate if this is a valid way of doing this? >> Thanks. >> >> >> >> >> >> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote: >> >>> Hi, >>> >>> I'm testing to see how I can build an integration test for a Beam >>> application that uses PubSub. >>> >>> So I start the Google provided PubSub emulator, create things like topic >>> and subscription, put in some validation messages and then run the job >>> against that and verify the data that comes out. >>> I'm logging the events to the screen and there I see the data coming in >>> and being processed. >>> >>> The problem I have is that I have not been able to figure out how to >>> cleanly terminate this stream after it has processed all my messages. >>> >>> I have also inserted some 'stop' messages to enable triggering a "we're >>> done, you can stop now". >>> >>> I've been digging through documentation and the apis and found nothing >>> that works. >>> >>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000 >>> )) >>> >>> I have tried setting the timestamp of those to MAX_LONG and >>> TIMESTAMP_MAX_VALUE but that yielded exceptions. >>> >>> So far I have not been able to figure out how to tell the TestPipeline: >>> Finish what you have and shutdown. >>> >>> How do I do that? >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> >> >>
