Oops, I think I explained something wrong in the previous email. B means not A. Therefore, after the completed match, there must be no new partial match starting from there. There's nothing wrong with the implementation, but the example in [2] is wrong.
Am I right? Best, Dongwon On Thu, Sep 5, 2019 at 9:15 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi, > I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1]. > While testing the query in [2] on myself, I've got the different result > from [2] > The query result from [2] is as follows: > > symbol start_tstamp end_tstamp avgPrice > ========= ================== ================== ============ > ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 > ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 > > The other query result from the attached maven project (which only > contains a sample program that executes the query in [2]) is as follows: > >> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5 >> > There's just one entry, not two. > (As you might notice, the time of the first record in the attached maven > project is 1970-01-01 00:00:01 for testing. The other numbers are the same.) > > I dug into the internal implementation of CepOperator and got the > followings: > > 1. INPUT : ACME,1000,12.0,1 > 2. PARTIAL MATCH : [A*1] > 3. > 4. INPUT : ACME,2000,17.0,2 > 5. PARTIAL MATCH : [A*2] > 6. > 7. INPUT : ACME,3000,13.0,1 > 8. PARTIAL MATCH : [A*3] > 9. PARTIAL MATCH : [A*1] > 10. > 11. INPUT : ACME,4000,16.0,3 > 12. PARTIAL MATCH : [A*4] > 13. PARTIAL MATCH : [A*2] > 14. > 15. *INPUT : ACME,5000,25.0,2* > 16. *COMPLETED MATCH : [A*4, B*1]* > 17. > 18. INPUT : ACME,6000,2.0,1 > 19. PARTIAL MATCH : [A*1] > 20. > 21. INPUT : ACME,7000,4.0,1 > 22. PARTIAL MATCH : [A*2] > 23. PARTIAL MATCH : [A*1] > 24. > 25. INPUT : ACME,8000,10.0,2 > 26. PARTIAL MATCH : [A*3] > 27. PARTIAL MATCH : [A*2] > 28. PARTIAL MATCH : [A*1] > 29. > 30. INPUT : ACME,9000,15.0,2 > 31. PARTIAL MATCH : [A*4] > 32. PARTIAL MATCH : [A*3] > 33. PARTIAL MATCH : [A*2] > 34. > 35. INPUT : ACME,10000,25.0,2 > 36. PARTIAL MATCH : [A*5] > 37. PARTIAL MATCH : [A*4] > 38. > 39. INPUT : ACME,11000,30.0,1 > 40. PARTIAL MATCH : [A*6] > > > My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get > a completed match (line 16) but no partial match (which is [A*1] in my > notation) starting from it. > According to the definition of "AFTER MATCH SKIP TO FIRST B", as > "ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2". > However, a new match starts from the next one (line 18, 19) in the above > trace. > Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at > that point is 14.3(=2+4+10+15+25+30/6) which is less than 15 > so "ACME,11000,30.0,1" belongs to A, not B as shown in the example. > > Is it a bug? or did I miss something conceptually? > > p.s. how do you load rows from a local csv file with rowtime configured? I > don't like the way I implemented my custom table source in the attached > file which I use for testing. > > Best, > Dongwon > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations >