Hi Kostas, Thank you for the suggestion. But in our case we want to do either a component test that involves several steps, where the CoFlatMap is one step in the middle, or integration test that test the whole flow, which involves also the CoFlatMap. And we trying to understand how to test such scenario so that results are predictable, and that elements from main stream arrive after elements from control stream, or other way around.
Thanks again, Tovi From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: יום ה 07 דצמבר 2017 19:11 To: Sofer, Tovi [ICG-IT] <ts72...@imceu.eu.ssmb.com> Cc: user@flink.apache.org Subject: Re: Testing CoFlatMap correctness Hi Tovi, What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like: TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = new TwoInputStreamOperatorTestHarness<>(myoperator); testHarness.setup(); testHarness.open(); testHarness.processWatermark1(new Watermark(17)); testHarness.processWatermark2(new Watermark(17)); testHarness.processElement1(new StreamRecord<>(5, 12L)); testHarness.processWatermark1(new Watermark(42)); testHarness.processWatermark2(new Watermark(42)); testHarness.processElement2(new StreamRecord<>("6", 13L)); and then use testHarness.getOutput() to get the output and compare it against the expected one. If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example. Or you can find it here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_test_java_org_apache_flink_streaming_api_operators_co_CoProcessOperatorTest.java&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99_MiSMX5oOs&m=gBef8R0NU-syKQC30s15-0u2EacsQc1Nc_-YiEJOKu8&s=JMo6NemjvMcOawmPTAuffrC8WfvZZppabhaJ8o5IJdY&e=> Hope this helps, Kostas On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <tovi.so...@citi.com<mailto:tovi.so...@citi.com>> wrote: Hi group, What is the best practice for testing CoFlatMap operator correctness? We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens. How can I test this case? Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order. Thanks in advance, Tovi