Hi there, I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs.
I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction? My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so: StreamExecutionEnvironment env = StreamExecutionEnvironment. createLocalEnvironment (); env.setParallelism( 1 ); env.setStreamTimeCharacteristic(TimeCharacteristic. EventTime ); StreamTableEnvironment tEnv = StreamTableEnvironment. create (env); List<SomePOJO> data = createPOJOTestData (); DataStream<SomePOJO> stream = env.fromCollection(data); //final Table asTable = tEnv.fromDataStream(stream); //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here... tEnv.registerDataStream( "SAMPLE" , stream); Table filteredTable = tEnv.sqlQuery( "SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10" ); stream = tEnv.toAppendStream(filteredTable, SomePOJO. class ); List<SomePOJO> list = IteratorUtils. toList (DataStreamUtils. collect (stream)); //... test assertions It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world. Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :) Best regards Theo