Hi,
I found a way that seems to work.
I have
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
in the test I configure PubSub to connect to the emulator
PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
options.setProject(PROJECT_NAME);
options.setPubsubRootUrl(getPubsubRootUrl());
options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
options.setStreaming(true);
I then hook my processing to this pipeline and afterwards I do this:
*pipeline
* .getOptions() .as(DirectOptions.class)
.setBlockOnRun(false);
PipelineResult job = pipeline.run();
long waitTime = 5000;
LOG.info("Waiting ... {} seconds", waitTime/1000);
job.waitUntilFinish(Duration.millis(waitTime));
Although this works it will fail my build on a slow machine.
Is this the best option? Or can I inject a "stop" message in my stream
and let the pipeline shutdown once it sees that?
Can you please indicate if this is a valid way of doing this?
Thanks.
On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote:
> Hi,
>
> I'm testing to see how I can build an integration test for a Beam
> application that uses PubSub.
>
> So I start the Google provided PubSub emulator, create things like topic
> and subscription, put in some validation messages and then run the job
> against that and verify the data that comes out.
> I'm logging the events to the screen and there I see the data coming in
> and being processed.
>
> The problem I have is that I have not been able to figure out how to
> cleanly terminate this stream after it has processed all my messages.
>
> I have also inserted some 'stop' messages to enable triggering a "we're
> done, you can stop now".
>
> I've been digging through documentation and the apis and found nothing
> that works.
>
> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
>
> I have tried setting the timestamp of those to MAX_LONG and
> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>
> So far I have not been able to figure out how to tell the TestPipeline:
> Finish what you have and shutdown.
>
> How do I do that?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
--
Best regards / Met vriendelijke groeten,
Niels Basjes