Hi Niels, AFAICT, for that reason in KafkaIOIT [1] we use “.withMaxNumRecords(numRecords)” where “numRecords” is actually a cardinality of input dataset. It’s not 100% faire imo, since in this case UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
Also, I believe that “p.run()” should be asynchronous by default [2] and I guess it’s blocked only for DirectRunner (perhaps in a sense of testing reasons). [1] https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java> [2] https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline <https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline> > On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote: > > Hi, > > I found a way that seems to work. > > I have > @Rule > public final transient TestPipeline pipeline = TestPipeline.create(); > in the test I configure PubSub to connect to the emulator > PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class); > options.setProject(PROJECT_NAME); > options.setPubsubRootUrl(getPubsubRootUrl()); > options.setCredentialFactoryClass(EmulatorCredentialsFactory.class); > options.setStreaming(true); > I then hook my processing to this pipeline and afterwards I do this: > pipeline > .getOptions() > .as(DirectOptions.class) > .setBlockOnRun(false); > PipelineResult job = pipeline.run(); > long waitTime = 5000; > LOG.info("Waiting ... {} seconds", waitTime/1000); > job.waitUntilFinish(Duration.millis(waitTime)); > > Although this works it will fail my build on a slow machine. > Is this the best option? Or can I inject a "stop" message in my stream and > let the pipeline shutdown once it sees that? > > > > > > > Can you please indicate if this is a valid way of doing this? > Thanks. > > > > > > On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected] > <mailto:[email protected]>> wrote: > Hi, > > I'm testing to see how I can build an integration test for a Beam application > that uses PubSub. > > So I start the Google provided PubSub emulator, create things like topic and > subscription, put in some validation messages and then run the job against > that and verify the data that comes out. > I'm logging the events to the screen and there I see the data coming in and > being processed. > > The problem I have is that I have not been able to figure out how to cleanly > terminate this stream after it has processed all my messages. > > I have also inserted some 'stop' messages to enable triggering a "we're done, > you can stop now". > > I've been digging through documentation and the apis and found nothing that > works. > > This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000)) > > I have tried setting the timestamp of those to MAX_LONG and > TIMESTAMP_MAX_VALUE but that yielded exceptions. > > So far I have not been able to figure out how to tell the TestPipeline: > Finish what you have and shutdown. > > How do I do that? > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes
