Re: Testing RichAsyncFunction with TestHarness

2020-04-02 Thread KristoffSC
Thanks, I would suggest adding my "tutorial" about using testHarnes for AsynOperators, to the documentation. Or maybe build something based on this use case, that could be helpful for others in the future :) Thanks, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050

Re: Testing RichAsyncFunction with TestHarness

2020-03-30 Thread Gary Yao
> > Additionally even though I add all necessary dependencies defiend in [1] I > cannot see ProcessFunctionTestHarnesses class. > That class was added in Flink 1.10 [1]. [1] https://github.com/apache/flink/blame/f765ad09ae2b2aa478c887b988e11e92a8b730bd/flink-streaming-java/src/test/java/org/apach

Re: Testing RichAsyncFunction with TestHarness

2020-03-29 Thread KristoffSC
HI :) I have finally figured it out :) On top of changes from last email, in my test method, I had to wrap "testHarness.processElement" in synchronized block, like this: @Test public void foo() throws Exception { synchronized (this.testHarness.getCheckpointLock()) { testHarness.proce

Re: Testing RichAsyncFunction with TestHarness

2020-03-29 Thread KristoffSC
Hi, another update on this one. I managed to make the workaround a little bit cleaner. The test setup I have now is like this: ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream(); ObjectOutputStream oosStreamEdges = new ObjectOutputStream(streamEdgesBytes); oosStreamEd

Re: Testing RichAsyncFunction with TestHarness

2020-03-27 Thread KristoffSC
I think I got this to work, although with "nasty" workaround. I've debugged that configuration for this testHarnes operator was missing two entries: "edgesInOrder" "typeSerializer_in_1" I added conditional break points to InstantiationUtils.readObjectFromConfig method for those two keys and I ran

Re: Testing RichAsyncFunction with TestHarness

2020-03-27 Thread KristoffSC
I've debug it a little bit and I found that it fails in InstantiationUtil.readObjectFromConfig method when we execute byte[] bytes = config.getBytes(key, (byte[])null); This returns null. The key that it is looking for is "edgesInOrder". In the config map, there are only two entries though. For