Hello Jing, Regarding the convention(from 'IN' to 'OR') threshold, could you please kindly explain it with more details? Is it the count of the items of the 'IN' clause?
BR, Jacky ________________________________ 发件人: JING ZHANG <beyond1...@gmail.com> 发送时间: 2021年6月18日 15:19 收件人: Leonard Xu <xbjt...@gmail.com> 抄送: 纳兰清风 <houying910...@163.com>; User-Flink <user@flink.apache.org> 主题: Re: Flink sql case when problem Hi houying, The root cause of `CodeGenException` is comparing Integer with Varchar (b is VARCHAR, '' and '0' are VARCHAR). The Problem could be solved by updating type of b from INTEGER to VARCHAR. Note, comparing INTEGER with VARCHAR may introduce other unexpected results. For example in your above demo, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1 Then let's analyze why two queries in your examples leads to different results. > Why `CodeGenException` would be thrown out when using b = '' or b = '0' Calcite would generate a `SEARCHR` operator. When generating code for the expression, the left expression is b, resultType is IntType; right expressions are '' and '0', result types are CharType. So an `CodeGenException` would be thrown out. > But it works well when I change the when statement to b in ('','0') Because Calcite would convert `IN` to `OR` when converting SQL to RelNode since we set the convert threshold to Integer.MAX_VALUE in Flink. After converting `IN` to `OR`, Calcite would first cast '1' and '0' to the type of b explicitly, then compare them to b. Please note that If update the threshold of conversion from Integer.MAX_VALUE to 1, the query also throw exception when convert SQL to RelNode, like the following exceptions: java.lang.ClassCastException: org.apache.calcite.util.NlsString cannot be cast to java.math.BigDecimal at org.apache.calcite.sql2rel.SqlToRelConverter.convertLiteralInValuesList(SqlToRelConverter.java:1759) at org.apache.calcite.sql2rel.SqlToRelConverter.convertRowValues(SqlToRelConverter.java:1685) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1620) at org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1603) at org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1170) at org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1063) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4185) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:687) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org<http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1056) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1025) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:302) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:640) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:291) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:678) @Leonard Do we need to add improvements in the Flink framework to solve the problem? If yes, we could create a JIRA to track this, and discuss how to do improvement. BTW, there is another related issue which needs discussion, select * from source_tb where b = '1', the condition b = '1' would return false for records with b = 1, the behavior is different with Postgres and Mysql. Best regards, JING ZHANG Leonard Xu <xbjt...@gmail.com<mailto:xbjt...@gmail.com>> 于2021年6月17日周四 下午8:11写道: Hi, houying It looks like a bug when code generate the operator code, which Flink version are you using? Could you help create an JIRA ticket? Best, Leonard 在 2021年6月17日,19:48,纳兰清风 <houying910...@163.com<mailto:houying910...@163.com>> 写道: Hello, When I am using case when statement in flink sql, I got an error as follow: org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$3,isNull$3,,INT,None) and ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$4),false,,CHAR(0) NOT NULL,Some()), GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) str$5),false,,CHAR(1) NOT NULL,Some(0))). at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354) at scala.Option.orElse(Option.scala:289) at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) ...... My SQL is create table source_tb ( a varchar, b INTEGER ) with ( 'connector' = 'kafka', ... ); create table sink_tb ( a varchar, c varchar ) with ( 'connector' = 'console', 'format' = 'rich-json' ); insert into sink_tb(a, c) select a, case when b is null or b = '' or b = '0' then '1000-01-01 00:00:00' else from_unixtime(cast(b as bigint)) end as c from source_tb; But it works well when I change the when statement to b is null or b in ('', '0') Does anyone have idea about this ?