Re:Re: Flink sql case when problem

2021-06-22 Thread 纳兰清风
Hi JING ZHANG,


Thank for your reply. I got the points why the exception cause which is I use 
the b as varchar to compare when '' and '1' instead of intType.
I just thought flink engine will help me to do the type implicit conversion.
So for now, I'd better fix it in a right way such as select * from source_tb 
where b = 1 


Does the commucaty intend to do something in type implicit conversion ?
















At 2021-06-18 15:19:54, "JING ZHANG"  wrote:

Hi houying,
The root cause of `CodeGenException` is comparing Integer with Varchar (b is 
VARCHAR, '' and '0' are VARCHAR).
The Problem could be solved by updating type of b from INTEGER to VARCHAR.
Note, comparing INTEGER with VARCHAR may introduce other unexpected results. 
For example in your above demo, select * from source_tb where b = '1', the 
condition b = '1' would return false for records with b = 1


Then let's analyze why two queries in your examples leads to different results.
> Why `CodeGenException` would be thrown out when using b = '' or b = '0'
Calcite would generate a `SEARCHR` operator. When generating code for the 
expression, the left expression is b, resultType is IntType; right expressions 
are '' and '0', result types are CharType. So an `CodeGenException` would be 
thrown out.


> But it works well when I change the when statement to b in ('','0')
Because Calcite would convert `IN` to `OR` when converting SQL to RelNode since 
we set the convert threshold to Integer.MAX_VALUE in Flink. 
After converting `IN` to `OR`, Calcite would first cast '1' and '0' to the type 
of b explicitly, then compare them to b.
Please note that If update the threshold of conversion from Integer.MAX_VALUE 
to 1, the query also throw exception when convert SQL to RelNode, like the 
following exceptions:



java.lang.ClassCastException: org.apache.calcite.util.NlsString cannot be cast 
to java.math.BigDecimal

at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertLiteralInValuesList(SqlToRelConverter.java:1759)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertRowValues(SqlToRelConverter.java:1685)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryOrInList(SqlToRelConverter.java:1620)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertExists(SqlToRelConverter.java:1603)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.substituteSubQuery(SqlToRelConverter.java:1170)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.replaceSubQueries(SqlToRelConverter.java:1063)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4185)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:687)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1056)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1025)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:302)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:640)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:291)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:678)



@Leonard

Do we need to add improvements in the Flink framework to solve the problem?
If yes, we could create a JIRA to track this, and discuss how to do improvement.
BTW, there is another related issue which needs discussion, select * from 
source_tb where b = '1', the condition b = '1' would return false for records 
with b = 1, the behavior is different with Postgres and Mysql.


Best regards,
JING ZHANG


Leonard Xu  于2021年6月17日周四 下午8:11写道:

Hi, houying


It looks like a bug when code generate the operator code, which Flink version 
are you using? 
Could you help create an JIRA ticket?




Best,
Leonard




在 2021年6月17日,19:48,纳兰清风  写道:


Hello,


When I am using case when statement in flink sql, I got an error as follow:


org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common 
type of GeneratedExpression(field$3,isNull$3,,INT,None) and 
ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData)
 str$4),false,,CHAR(0) NOT NULL,Some()), 

Re:Re: Flink sql case when problem

2021-06-17 Thread 纳兰清风
Hi Leonard Xu,

The version is 1.13. Is it a bug? I noticed that the type of column `b` is 
integer, but I use it as varchar.
What the expected action should it be ?




At 2021-06-17 20:11:24, "Leonard Xu"  wrote:

Hi, houying


It looks like a bug when code generate the operator code, which Flink version 
are you using? 
Could you help create an JIRA ticket?




Best,
Leonard




在 2021年6月17日,19:48,纳兰清风  写道:


Hello,


When I am using case when statement in flink sql, I got an error as follow:


org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common 
type of GeneratedExpression(field$3,isNull$3,,INT,None) and 
ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData)
 str$4),false,,CHAR(0) NOT NULL,Some()), 
GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) 
str$5),false,,CHAR(1) NOT NULL,Some(0))).
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
..


My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case 
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', 
'0')


Does anyone have idea about this ?