Hi, Daniel Henneberger. 
Thanks for reporting. It seems a bug to me. Could you please help create a 
Jira[1] for it? 
As a workaround, is it possible not to use UNNEST? May be you can try to use 
EXPLODE function for the Flink planner will rewrites UNNEST to explode function 
in implementation[2]. 

[1] https://issues.apache.org/jira/projects/FLINK/issues/ 
[2] 
https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
 

Best regards, 
Yuxia 


发件人: "Daniel Henneberger" <henneberger.dan...@gmail.com> 
收件人: "User" <user@flink.apache.org> 
发送时间: 星期三, 2023年 2 月 22日 上午 5:35:02 
主题: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API 

Dear Apache Flink community, 

I could use some help with a serialization issue I'm having while using the 
Table API. Specifically, I'm trying to deserialize a serialized CompiledPlan, 
but I'm running into trouble with the UNNEST_ROWS operation. It seems that the 
CompilePlan deserializer isn't looking up any functions in the 
BuiltInFunctionDefinitions class, which is causing the de-serialization to 
fail. 

Do any of you have experience with this issue or know of a workaround for 
serializing a Table API plan? 

Below is code to replicate. 

Thanks, 
Daniel Henneberger 

private void test () { 
EnvironmentSettings settings = EnvironmentSettings . newInstance 
().inStreamingMode().build(); 
TableEnvironment tEnv = TableEnvironment . create ( settings ); 

// Create a table of values 
Table table = tEnv .fromValues(createNestedDatatype(), 
Row . of ( List . of ( Row . of ( "nested" )), "name" )); 
tEnv .createTemporaryView( "table1" , table ); 

// Invoke the unnest operation 
Table unnested = tEnv .sqlQuery( "SELECT name, nested \n " 
+ "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)" ); 

StatementSet statementSet = tEnv .createStatementSet(); 
statementSet .addInsert( TableDescriptor . forConnector ( "print" ).build(), 
unnested ); 

// Serialize the plan 
CompiledPlan plan = statementSet .compilePlan(); 
String json = plan .asJsonString(); 

// Attempt to load the plan 
// This fails with the error 'Could not resolve internal system function 
'$UNNEST_ROWS$1'. This is a bug, please file an issue.' 
CompiledPlan plan2 = tEnv .loadPlan( PlanReference . fromJsonString ( json )); 
plan2 .execute().print(); 
} 

private DataType createNestedDatatype () { 
return DataTypes . ROW ( 
DataTypes . FIELD ( "arr" , DataTypes . ARRAY ( DataTypes . ROW ( 
DataTypes . FIELD ( "nested" , DataTypes . STRING ()) 
))), 
DataTypes . FIELD ( "name" , DataTypes . STRING ())); 
} 

Reply via email to