Have you tried cancelling[1] the pipeline once your condition is met? 1: https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46
On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <[email protected]> wrote: > Hi Niels, > > AFAICT, for that reason in KafkaIOIT [1] we use > “.withMaxNumRecords(numRecords)” where “numRecords” is actually a > cardinality of input dataset. It’s not 100% faire imo, since in this case > UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use > “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this. > > Also, I believe that “p.run()” should be asynchronous by default [2] and I > guess it’s blocked only for DirectRunner (perhaps in a sense of testing > reasons). > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java > [2] > https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline > > On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote: > > Hi, > > I found a way that seems to work. > > I have > > @Rule > > public final transient TestPipeline pipeline = TestPipeline.create(); > > in the test I configure PubSub to connect to the emulator > > PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class); > > options.setProject(PROJECT_NAME); > > options.setPubsubRootUrl(getPubsubRootUrl()); > > options.setCredentialFactoryClass(EmulatorCredentialsFactory.class); > > options.setStreaming(true); > > I then hook my processing to this pipeline and afterwards I do this: > > *pipeline > * .getOptions() .as(DirectOptions.class) > .setBlockOnRun(false); > > PipelineResult job = pipeline.run(); > > long waitTime = 5000; > > LOG.info("Waiting ... {} seconds", waitTime/1000); > > job.waitUntilFinish(Duration.millis(waitTime)); > > > Although this works it will fail my build on a slow machine. > > Is this the best option? Or can I inject a "stop" message in my stream and > let the pipeline shutdown once it sees that? > > > > > > > > Can you please indicate if this is a valid way of doing this? > Thanks. > > > > > > On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote: > >> Hi, >> >> I'm testing to see how I can build an integration test for a Beam >> application that uses PubSub. >> >> So I start the Google provided PubSub emulator, create things like topic >> and subscription, put in some validation messages and then run the job >> against that and verify the data that comes out. >> I'm logging the events to the screen and there I see the data coming in >> and being processed. >> >> The problem I have is that I have not been able to figure out how to >> cleanly terminate this stream after it has processed all my messages. >> >> I have also inserted some 'stop' messages to enable triggering a "we're >> done, you can stop now". >> >> I've been digging through documentation and the apis and found nothing >> that works. >> >> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000)) >> >> I have tried setting the timestamp of those to MAX_LONG and >> TIMESTAMP_MAX_VALUE but that yielded exceptions. >> >> So far I have not been able to figure out how to tell the TestPipeline: >> Finish what you have and shutdown. >> >> How do I do that? >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > > >
