Re: 自定义的sql connector在sql-cli中运行问题

2020-07-14 文章 admin
解决了,原因是我同时实现了createTableSink和createStreamTableSink导致
删掉createTableSink就可以了


> 2020年7月14日 上午10:50,admin <17626017...@163.com> 写道:
> 
> hi all,
> 我自定义了一个sql 
> connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
> 2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient 
>  [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
> update statement.
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
>at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> Caused by: scala.MatchError: null
>at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.Option.map(Option.scala:146) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341)
>  ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> 

自定义的sql connector在sql-cli中运行问题

2020-07-13 文章 admin
hi all,
我自定义了一个sql 
connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update 
statement.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: scala.MatchError: null
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.Option.map(Option.scala:146) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341)
 ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at