Hi there, 

I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL 
are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE 
expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and 
arbitrarily nested AND/OR constructs. 

I was wondering if I could somehow transform a SQL WHERE expression into a 
Flink FilterFunction? 

My approach right now is to register my Stream as a table, run a SQL query on 
it and return back to a DataStream like so: 

StreamExecutionEnvironment env = StreamExecutionEnvironment. 
createLocalEnvironment (); 
env.setParallelism( 1 ); 
env.setStreamTimeCharacteristic(TimeCharacteristic. EventTime ); 
StreamTableEnvironment tEnv = StreamTableEnvironment. create (env); 

List<SomePOJO> data = createPOJOTestData (); 
DataStream<SomePOJO> stream = env.fromCollection(data); 

//final Table asTable = tEnv.fromDataStream(stream); 
//Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO 
SQL style 'AND' possible here... 

tEnv.registerDataStream( "SAMPLE" , stream); 
Table filteredTable = tEnv.sqlQuery( "SELECT * FROM SAMPLE WHERE user = 
'pitter' AND age > 10" ); 

stream = tEnv.toAppendStream(filteredTable, SomePOJO. class ); 
List<SomePOJO> list = IteratorUtils. toList (DataStreamUtils. collect 
(stream)); 
//... test assertions 

It feels a bit weird that I need to go the full way up to the SQL API with 
registering the table to "just" apply the WHERE clause of a table and that I 
can't assign uid, operator_name to this operator anymore, leaving the 
DataStream world. 

Is the way I wrote it the best way to approach or do you have any better idea? 
Are there any caveats here? Not that I didn't assign the event time column on 
purpose as I know that it's just a WHERE without any windowing etc and I wanted 
to test that it still works without any explicit time column :) 

Best regards 
Theo 

Reply via email to