For unit testing, I would recommend using TestStream + PAssert where
TestStream replaces the Pubsub source. You can see some examples in
LeaderBoardTest[1]

For integration testing, I would recommend using a metric and cancelling
the pipeline from another thread once the condition is met. There is some
code in the TestDataflowRunner[2] that does this (this code checks for
PAssert success/failure metrics but you can look for any metric that you
want).

1:
https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
2:
https://github.com/apache/beam/blob/de1c14777d3c6a1231361db12f3a0b9fd3b84b3e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java#L145

On Thu, Jul 16, 2020 at 11:16 AM Niels Basjes <[email protected]> wrote:

> What is the cleanest way to detect the pipeline can be cancelled?
> If the pipeline runs as intended I think I should stop on a "stop"
> message. And in case of problems I should stop on a timeout.
>
> Do you know of an example that does this?
>
> Niels
>
> On Thu, 16 Jul 2020, 18:34 Luke Cwik, <[email protected]> wrote:
>
>> Have you tried cancelling[1] the pipeline once your condition is met?
>>
>> 1:
>> https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46
>>
>> On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <
>> [email protected]> wrote:
>>
>>> Hi Niels,
>>>
>>> AFAICT, for that reason in KafkaIOIT [1] we use
>>> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
>>> cardinality of input dataset. It’s not 100% faire imo, since in this case
>>> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
>>> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>>>
>>> Also, I believe that “p.run()” should be asynchronous by default [2] and
>>> I guess it’s blocked only for DirectRunner (perhaps in a sense of testing
>>> reasons).
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
>>> [2]
>>> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>>>
>>> On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote:
>>>
>>> Hi,
>>>
>>> I found a way that seems to work.
>>>
>>> I have
>>>
>>> @Rule
>>>
>>> public final transient TestPipeline pipeline = TestPipeline.create();
>>>
>>> in the test I configure PubSub to connect to the emulator
>>>
>>> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>>>
>>> options.setProject(PROJECT_NAME);
>>>
>>> options.setPubsubRootUrl(getPubsubRootUrl());
>>>
>>> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>>>
>>> options.setStreaming(true);
>>>
>>> I then hook my processing to this pipeline and afterwards I do this:
>>>
>>> *pipeline
>>> *        .getOptions()        .as(DirectOptions.class)        
>>> .setBlockOnRun(false);
>>>
>>> PipelineResult job = pipeline.run();
>>>
>>> long waitTime = 5000;
>>>
>>> LOG.info("Waiting ... {} seconds", waitTime/1000);
>>>
>>> job.waitUntilFinish(Duration.millis(waitTime));
>>>
>>>
>>> Although this works it will fail my build on a slow machine.
>>>
>>> Is this the best option? Or can I inject a "stop" message in my stream and 
>>> let the pipeline shutdown once it sees that?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Can you please indicate if this is a valid way of doing this?
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm testing to see how I can build an integration test for a Beam
>>>> application that uses PubSub.
>>>>
>>>> So I start the Google provided PubSub emulator, create things like
>>>> topic and subscription, put in some validation messages and then run the
>>>> job against that and verify the data that comes out.
>>>> I'm logging the events to the screen and there I see the data coming in
>>>> and being processed.
>>>>
>>>> The problem I have is that I have not been able to figure out how to
>>>> cleanly terminate this stream after it has processed all my messages.
>>>>
>>>> I have also inserted some 'stop' messages to enable triggering a "we're
>>>> done, you can stop now".
>>>>
>>>> I've been digging through documentation and the apis and found nothing
>>>> that works.
>>>>
>>>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000
>>>> ))
>>>>
>>>> I have tried setting the timestamp of those to MAX_LONG and
>>>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>>>
>>>> So far I have not been able to figure out how to tell the TestPipeline:
>>>> Finish what you have and shutdown.
>>>>
>>>> How do I do that?
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>>
>>>

Reply via email to