Flink version: 1.12.0
在使用 Flink 执行 Flink SQL 流表 join 维表, 运行报错(流表SQL 和维表SQL单独运行都没有问题), 错误堆栈信息如下:
Exception in thread "main" java.lang.RuntimeException:
org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common
type of GeneratedExpression(field$18,isNull$17,,STRING,None) and
ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)),
GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))).
at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:47)
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unable to
find common type of GeneratedExpression(field$18,isNull$17,,STRING,None) and
ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)),
GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))).
at
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:307)
at scala.Option.orElse(Option.scala:289)
at
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:307)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:143)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:190)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:38)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
at com.hmd.stream.SqlSubmit.callInsertInto(SqlSubmit.java:110)
at com.hmd.stream.SqlSubmit.callCommand(SqlSubmit.java:85)
at com.hmd.stream.SqlSubmit.run(SqlSubmit.java:71)
at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:45)
运行的 SQL 如下:
-- 流表 source kafka
CREATE TABLE `t_Order_Order` (
id BIGINT,
type INT,
amount VARCHAR,
receivedAmount VARCHAR,
channelType VARCHAR,
accountId BIGINT,
isCreditPeriod VARCHAR,
isCyclePeriod VARCHAR,
originalOrderId VARCHAR,
status VARCHAR,
insertTime VARCHAR,
`mark` INT,
isNegotiation VARCHAR,
statusTime1 VARCHAR,
statusTime2 VARCHAR,
statusTime4 VARCHAR,
statusTime8 VARCHAR,
statusTime16 VARCHAR,
isFirstOrder VARCHAR,
isReturn VARCHAR ,
orgCode VARCHAR ,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'ods_Homedo_t_Order_Order',
'properties.bootstrap.servers' = '10.0.15.130:9092',
'properties.group.id' = 'test-homodo',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
'json.ignore-parse-errors' = 'true'
);
-- dim_finance_account_fortest
CREATE TABLE `dim_finance_account_fortest`(
`id` BIGINT,
`mark` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='jdbc',
'table-name'='t_finance_account_fortest',
'url'='jdbc:mysql://10.0.0.29:3306/hmd_ods',
'username'='read',
'password'='si7v3#,a',
'lookup.cache.max-rows'='5000',
'lookup.cache.ttl'='600s'
);
-- sink
CREATE TABLE `sink_kafka` (
order_id BIGINT ,
account_id BIGINT ,
original_order_id VARCHAR ,
org_code VARCHAR ,
order_type_id INT,
order_status_id VARCHAR,
order_status_name VARCHAR,
channel_type_id VARCHAR,
channel_type_name VARCHAR,
is_credit_order INT,
is_credit_period VARCHAR,
is_cycle_period VARCHAR,
is_negotiation VARCHAR,
is_firstorder VARCHAR,
is_return VARCHAR,
order_gmv VARCHAR,
reder_total_received_amount VARCHAR,
order_date VARCHAR,
order_time VARCHAR,
order_cancel_time VARCHAR,
order_complete_time VARCHAR,
order_pend_payment VARCHAR,
order_confirm_time VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO `sink_kafka`
SELECT
oo.id AS order_id ,
oo.accountId AS account_id,
oo.originalOrderId AS original_order_id,
oo.orgCode AS org_code,
oo.type AS order_type_id,
oo.status AS order_status_id,
( case
when oo.status = 1 then '交易取消'
when oo.status = 2 then '交易完成'
when oo.status = 4 then '等待客服处理'
when oo.status = 8 then '等待客户付款'
when oo.status =16 then '等待订单发出'
when oo.status =32 then '等待客户收货'
else '其他'
end
) AS order_status_name,
oo.channelType AS channel_type_id,
( case oo.channelType
when 1 then 'OMS'
when 2 then 'PC'
when 4 then 'M站'
when 8 then 'APP'
when 16 then 'APP'
when 32 then '小程序'
else '其他'
end
) AS channel_type_name,
( case
when oo.isCreditPeriod =1 and oo.isCyclePeriod =1 then 1
else 0
end
) AS is_credit_order,
oo.isCreditPeriod AS is_credit_period,
oo.isCyclePeriod AS is_cycle_period,
oo.isNegotiation AS is_negotiation,
oo.isFirstOrder AS is_firstorder,
oo.isReturn AS is_return,
oo.amount AS order_gmv,
oo.receivedAmount AS reder_total_received_amount,
SUBSTR(oo.insertTime,1,10) AS order_date,
SUBSTR(oo.insertTime,1,19) AS order_time,
oo.statusTime1 AS order_cancel_time,
oo.statusTime2 AS order_complete_time,
( case
when oo.isCreditPeriod =1 and oo.isCyclePeriod =1 then oo.statusTime4
else oo.statusTime8
end
) AS order_pend_payment,
oo.statusTime16 AS order_confirm_time
FROM
(SELECT
id ,
type ,
amount ,
receivedAmount ,
channelType ,
accountId ,
isCreditPeriod ,
isCyclePeriod ,
originalOrderId ,
status,
insertTime ,
isNegotiation ,
statusTime1 ,
statusTime2 ,
statusTime4 ,
statusTime8 ,
statusTime16 ,
isFirstOrder ,
isReturn ,
orgCode,
proctime
FROM `t_Order_Order` WHERE mark > 0) AS oo
LEFT JOIN `dim_finance_account_fortest` FOR SYSTEM_TIME AS OF oo.proctime AS
dfat
ON oo.accountId = dfat.id <http://dfat.id/>;