Hi Marjan, That's rather weird, because PyFlink uses the same implementation. Could you file a Jira ticket? If not, let me know and I'll create one for you.
Best regards, Martijn On Thu, Dec 22, 2022 at 11:37 AM Marjan Jordanovski <mjmarjan1...@gmail.com> wrote: > Hello, > > I am using custom made connector to create Source table in this way: > > create table Source ( > ts TIMESTAMP(3), > instance STRING, > sservice STRING, > logdatetime STRING, > threadid STRING, > level STRING, > log_line STRING > ) with ( > 'connector'='lokiquery', > 'host'='<lokiurl>', > 'lokiqueryparamsstring'='query={instance="test", > service="test"}&limit=5000&start=2022-12-15T16:40:09.560Z&end=2022-12-15T16:58:09.570Z' > ); > > In this table I successfully store data from the specified time range from > loki. Data is coming as a batch. (not stream) > > Then I want to create another table that will look for patterns in the > log_line column from the Source table. I am doing following: > > SELECT * > FROM Source > MATCH_RECOGNIZE ( > ORDER BY ts > MEASURES > START_ROW.ts AS start_ts, > END_ROW.ts AS end_ts > ONE ROW PER MATCH > AFTER MATCH SKIP TO LAST END_ROW > PATTERN (START_ROW{1} UNK_ROW+? MID_ROW{2} END_ROW{1}) > DEFINE > START_ROW AS START_ROW.log_line SIMILAR TO > '%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%', > MID_ROW AS MID_ROW.log_line SIMILAR TO '%DSResponse - > DSResponse: List with%', > END_ROW AS END_ROW.log_line SIMILAR TO '%ContentRepository%' > ) MR; > > And when using python's pyflink, this works just fine! > But when I try the same thing in flink sql cli, I get strange error after > executing second table: > > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not > enough rules to produce a node with desired properties: convention=LOGICAL, > FlinkRelDistributionTraitDef=any, sort=[]. > Missing conversion is LogicalMatch[convention: NONE -> LOGICAL] > There is 1 empty subset: rel#175:RelSubset#1.LOGICAL.any.[], the relevant > part of the original plan is as follows > 167:LogicalMatch(partition=[[]], order=[[0 ASC-nulls-first]], > outputFields=[[start_ts, end_ts]], allRows=[false], after=[SKIP TO > LAST(_UTF-16LE'END_ROW')], > pattern=[(((PATTERN_QUANTIFIER(_UTF-16LE'START_ROW', 1, 1, false), > PATTERN_QUANTIFIER(_UTF-16LE'UNK_ROW', 1, -1, true)), > PATTERN_QUANTIFIER(_UTF-16LE'MID_ROW', 2, 2, false)), > PATTERN_QUANTIFIER(_UTF-16LE'END_ROW', 1, 1, false))], > isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], > patternDefinitions=[[SIMILAR TO(PREV(START_ROW.$6, 0), > _UTF-16LE'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%'), > SIMILAR TO(PREV(MID_ROW.$6, 0), _UTF-16LE'%DSResponse - DSResponse: List > with%'), SIMILAR TO(PREV(END_ROW.$6, 0), _UTF-16LE'%ContentRepository%')]], > inputFields=[[ts, instance, service, logdatetime, threadid, level, > log_line]]) > 1:LogicalTableScan(subset=[rel#166:RelSubset#0.NONE.any.[]], > table=[[default_catalog, default_database, Source]]) > > In python, where this works, these are only configs that I use for table > environment (of course I also include jar for my custom connector) : > env_settings = EnvironmentSettings.in_batch_mode() > t_env = TableEnvironment.create(env_settings) > t_env.get_config().get_configuration().set_string("parallelism.default", > "1") > > Therefore I set these values in flink sql table: > SET 'execution.runtime-mode' = 'batch'; > SET 'parallelism.default' = '1'; > > But it didn't help. Does anyone have any idea what could be causing this > issue? > > Thank you, > Marjan >