Hi Till,
To answer your first question, I currently don't (and honestly now sure how 
other than of course in IDE I can use breakpoint, or if something like MockIto 
can do it). So did I interpret it correctly that it sounds like execution env 
started using flink-test-utils will essentially tear down once it consumes last 
data point (ie. end of collection I am passing) even though there could be 
active Timers Registered? 
Further, most of our pipelines are using low-level process functions - we toyed 
around with other windowing and session functions but process functions gave 
the most amount of flexibility (at least at this point until we can re-visit) 
and we generate keys for aggregation/windowing somewhere upstream (say map, 
flatMap or another process functions). Meaning some pipelines are event / 
processing time agnostic in a sense. Although technically within the process 
functions we will have timers registered etc. This helped us with unbounded 
keys, sensor data that could potentially be backfilled (ie. watermarks have 
passed way back etc). I wouldn't doubt a bit there are probably better 
solutions :)
With that background, I am sort of not following your second note about event 
time and how we can leverage that for testing. Our intent is to create sampled 
input from results and compare output from tests to results (ie. end to end 
integration tests) as part of our CICD. Normal flow seems to work well, just 
getting "negative" test cases of timeouts seems to be mystery right now :) So 
Single Operator harnesses doesn't sound like the right approach. let me know 
otherwise.
Thanks,

    On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann 
<trohrm...@apache.org> wrote:  
 
 Hi Ashish,
how do you make sure that all of your data is not consumed within a fraction of 
the 2 seconds? For this it would be better to use event time which allows you 
to control how time passes. If you want to test a specific operator you could 
try out the One/TwoInputStreamOperatorTestHarness.
Cheers,Till
On Fri, Sep 14, 2018 at 3:36 PM ashish pok <ashish...@yahoo.com> wrote:

All,
Hopefully a quick one. I feel like I have seen this answered before a few times 
before but can't find an appropriate example. I am trying to run few tests 
where registered timeouts are invoked (snippet below). Simple example as show 
in documentation for integration test (using flink-test-utils) seems to 
complete even though Timers are registered and have not been invoked. 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();        
env.setParallelism(1);        CollectSink.values.clear();        // create a 
stream of custom elements and apply transformations        
env.fromCollection(t.getTestInputs()) .process(new TupleProcessFn()) 
.keyBy(FactTuple::getKey) .process(new NormalizeDataProcessFn(2)) 
.addSink(getSink())
        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first 
TupleProcessFn() after a few Tuples are collected I can see onTimer being 
invoked. So what is the trick here? I went as far as putting in a MapFunction 
after second process function that has a sleep to no avail.
Thanks,
Ashish 
  

Reply via email to