Have you tried cancelling[1] the pipeline once your condition is met?

1:
https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46

On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <[email protected]>
wrote:

> Hi Niels,
>
> AFAICT, for that reason in KafkaIOIT [1] we use
> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
> cardinality of input dataset. It’s not 100% faire imo, since in this case
> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>
> Also, I believe that “p.run()” should be asynchronous by default [2] and I
> guess it’s blocked only for DirectRunner (perhaps in a sense of testing
> reasons).
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
> [2]
> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>
> On 16 Jul 2020, at 15:02, Niels Basjes <[email protected]> wrote:
>
> Hi,
>
> I found a way that seems to work.
>
> I have
>
> @Rule
>
> public final transient TestPipeline pipeline = TestPipeline.create();
>
> in the test I configure PubSub to connect to the emulator
>
> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>
> options.setProject(PROJECT_NAME);
>
> options.setPubsubRootUrl(getPubsubRootUrl());
>
> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>
> options.setStreaming(true);
>
> I then hook my processing to this pipeline and afterwards I do this:
>
> *pipeline
> *        .getOptions()        .as(DirectOptions.class)        
> .setBlockOnRun(false);
>
> PipelineResult job = pipeline.run();
>
> long waitTime = 5000;
>
> LOG.info("Waiting ... {} seconds", waitTime/1000);
>
> job.waitUntilFinish(Duration.millis(waitTime));
>
>
> Although this works it will fail my build on a slow machine.
>
> Is this the best option? Or can I inject a "stop" message in my stream and 
> let the pipeline shutdown once it sees that?
>
>
>
>
>
>
>
> Can you please indicate if this is a valid way of doing this?
> Thanks.
>
>
>
>
>
> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes <[email protected]> wrote:
>
>> Hi,
>>
>> I'm testing to see how I can build an integration test for a Beam
>> application that uses PubSub.
>>
>> So I start the Google provided PubSub emulator, create things like topic
>> and subscription, put in some validation messages and then run the job
>> against that and verify the data that comes out.
>> I'm logging the events to the screen and there I see the data coming in
>> and being processed.
>>
>> The problem I have is that I have not been able to figure out how to
>> cleanly terminate this stream after it has processed all my messages.
>>
>> I have also inserted some 'stop' messages to enable triggering a "we're
>> done, you can stop now".
>>
>> I've been digging through documentation and the apis and found nothing
>> that works.
>>
>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
>>
>> I have tried setting the timestamp of those to MAX_LONG and
>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>
>> So far I have not been able to figure out how to tell the TestPipeline:
>> Finish what you have and shutdown.
>>
>> How do I do that?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>
>

Reply via email to