Hi Niels,

AFAICT, for that reason in KafkaIOIT [1] we use 
“.withMaxNumRecords(numRecords)” where “numRecords” is actually a cardinality 
of input dataset. It’s not 100% faire imo, since in this case UnboundedRead 
becomes BoundedReadFromUnboundedSource, and we still use 
“waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.

Also, I believe that “p.run()” should be asynchronous by default [2] and I 
guess it’s blocked only for DirectRunner (perhaps in a sense of testing 
reasons).

[1] 
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
 
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java>
[2] 
https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
 
<https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline>

> On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote:
> 
> Hi,
> 
> I found a way that seems to work.
> 
> I have 
> @Rule
> public final transient TestPipeline pipeline = TestPipeline.create();
> in the test I configure PubSub to connect to the emulator
> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
> options.setProject(PROJECT_NAME);
> options.setPubsubRootUrl(getPubsubRootUrl());
> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
> options.setStreaming(true);
> I then hook my processing to this pipeline and afterwards I do this:
> pipeline
>         .getOptions()
>         .as(DirectOptions.class)
>         .setBlockOnRun(false);
> PipelineResult job = pipeline.run();
> long waitTime = 5000;
> LOG.info("Waiting ... {} seconds", waitTime/1000);
> job.waitUntilFinish(Duration.millis(waitTime));
> 
> Although this works it will fail my build on a slow machine.
> Is this the best option? Or can I inject a "stop" message in my stream and 
> let the pipeline shutdown once it sees that?
> 
> 
> 
> 
> 
> 
> Can you please indicate if this is a valid way of doing this?
> Thanks.
> 
> 
> 
> 
> 
> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> 
> I'm testing to see how I can build an integration test for a Beam application 
> that uses PubSub.
> 
> So I start the Google provided PubSub emulator, create things like topic and 
> subscription, put in some validation messages and then run the job against 
> that and verify the data that comes out.
> I'm logging the events to the screen and there I see the data coming in and 
> being processed.
> 
> The problem I have is that I have not been able to figure out how to cleanly 
> terminate this stream after it has processed all my messages. 
> 
> I have also inserted some 'stop' messages to enable triggering a "we're done, 
> you can stop now".
> 
> I've been digging through documentation and the apis and found nothing that 
> works.
> 
> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
> 
> I have tried setting the timestamp of those to MAX_LONG and 
> TIMESTAMP_MAX_VALUE but that yielded exceptions.
> 
> So far I have not been able to figure out how to tell the TestPipeline: 
> Finish what you have and shutdown.
> 
> How do I do that?
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes

Reply via email to