For unit testing, I would recommend using TestStream + PAssert where TestStream replaces the Pubsub source. You can see some examples in LeaderBoardTest[1]
For integration testing, I would recommend using a metric and cancelling the pipeline from another thread once the condition is met. There is some code in the TestDataflowRunner[2] that does this (this code checks for PAssert success/failure metrics but you can look for any metric that you want). 1: https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java 2: https://github.com/apache/beam/blob/de1c14777d3c6a1231361db12f3a0b9fd3b84b3e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java#L145 On Thu, Jul 16, 2020 at 11:16 AM Niels Basjes <[email protected]> wrote: > What is the cleanest way to detect the pipeline can be cancelled? > If the pipeline runs as intended I think I should stop on a "stop" > message. And in case of problems I should stop on a timeout. > > Do you know of an example that does this? > > Niels > > On Thu, 16 Jul 2020, 18:34 Luke Cwik, <[email protected]> wrote: > >> Have you tried cancelling[1] the pipeline once your condition is met? >> >> 1: >> https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46 >> >> On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko < >> [email protected]> wrote: >> >>> Hi Niels, >>> >>> AFAICT, for that reason in KafkaIOIT [1] we use >>> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a >>> cardinality of input dataset. It’s not 100% faire imo, since in this case >>> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use >>> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this. >>> >>> Also, I believe that “p.run()” should be asynchronous by default [2] and >>> I guess it’s blocked only for DirectRunner (perhaps in a sense of testing >>> reasons). >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java >>> [2] >>> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline >>> >>> On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote: >>> >>> Hi, >>> >>> I found a way that seems to work. >>> >>> I have >>> >>> @Rule >>> >>> public final transient TestPipeline pipeline = TestPipeline.create(); >>> >>> in the test I configure PubSub to connect to the emulator >>> >>> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class); >>> >>> options.setProject(PROJECT_NAME); >>> >>> options.setPubsubRootUrl(getPubsubRootUrl()); >>> >>> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class); >>> >>> options.setStreaming(true); >>> >>> I then hook my processing to this pipeline and afterwards I do this: >>> >>> *pipeline >>> * .getOptions() .as(DirectOptions.class) >>> .setBlockOnRun(false); >>> >>> PipelineResult job = pipeline.run(); >>> >>> long waitTime = 5000; >>> >>> LOG.info("Waiting ... {} seconds", waitTime/1000); >>> >>> job.waitUntilFinish(Duration.millis(waitTime)); >>> >>> >>> Although this works it will fail my build on a slow machine. >>> >>> Is this the best option? Or can I inject a "stop" message in my stream and >>> let the pipeline shutdown once it sees that? >>> >>> >>> >>> >>> >>> >>> >>> Can you please indicate if this is a valid way of doing this? >>> Thanks. >>> >>> >>> >>> >>> >>> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> I'm testing to see how I can build an integration test for a Beam >>>> application that uses PubSub. >>>> >>>> So I start the Google provided PubSub emulator, create things like >>>> topic and subscription, put in some validation messages and then run the >>>> job against that and verify the data that comes out. >>>> I'm logging the events to the screen and there I see the data coming in >>>> and being processed. >>>> >>>> The problem I have is that I have not been able to figure out how to >>>> cleanly terminate this stream after it has processed all my messages. >>>> >>>> I have also inserted some 'stop' messages to enable triggering a "we're >>>> done, you can stop now". >>>> >>>> I've been digging through documentation and the apis and found nothing >>>> that works. >>>> >>>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000 >>>> )) >>>> >>>> I have tried setting the timestamp of those to MAX_LONG and >>>> TIMESTAMP_MAX_VALUE but that yielded exceptions. >>>> >>>> So far I have not been able to figure out how to tell the TestPipeline: >>>> Finish what you have and shutdown. >>>> >>>> How do I do that? >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >>> >>>
