Hi,

I found a way that seems to work.

I have

@Rule

public final transient TestPipeline pipeline = TestPipeline.create();

in the test I configure PubSub to connect to the emulator

PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);

options.setProject(PROJECT_NAME);

options.setPubsubRootUrl(getPubsubRootUrl());

options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);

options.setStreaming(true);

I then hook my processing to this pipeline and afterwards I do this:

*pipeline
*        .getOptions()        .as(DirectOptions.class)
.setBlockOnRun(false);

PipelineResult job = pipeline.run();

long waitTime = 5000;

LOG.info("Waiting ... {} seconds", waitTime/1000);

job.waitUntilFinish(Duration.millis(waitTime));


Although this works it will fail my build on a slow machine.

Is this the best option? Or can I inject a "stop" message in my stream
and let the pipeline shutdown once it sees that?







Can you please indicate if this is a valid way of doing this?
Thanks.





On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote:

> Hi,
>
> I'm testing to see how I can build an integration test for a Beam
> application that uses PubSub.
>
> So I start the Google provided PubSub emulator, create things like topic
> and subscription, put in some validation messages and then run the job
> against that and verify the data that comes out.
> I'm logging the events to the screen and there I see the data coming in
> and being processed.
>
> The problem I have is that I have not been able to figure out how to
> cleanly terminate this stream after it has processed all my messages.
>
> I have also inserted some 'stop' messages to enable triggering a "we're
> done, you can stop now".
>
> I've been digging through documentation and the apis and found nothing
> that works.
>
> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
>
> I have tried setting the timestamp of those to MAX_LONG and
> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>
> So far I have not been able to figure out how to tell the TestPipeline:
> Finish what you have and shutdown.
>
> How do I do that?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to