Re: 自定义的sql connector在sql-cli中运行问题
解决了,原因是我同时实现了createTableSink和createStreamTableSink导致 删掉createTableSink就可以了 > 2020年7月14日 上午10:50,admin <17626017...@163.com> 写道: > > hi all, > 我自定义了一个sql > connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下 > 2020-07-14 10:36:29,148 WARN org.apache.flink.table.client.cli.CliClient > [] - Could not execute SQL statement. > org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL > update statement. >at > org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251] >at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > Caused by: scala.MatchError: null >at > org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.Option.map(Option.scala:146) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.collection.Iterator$class.foreach(Iterator.scala:891) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341) > ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] >at >
自定义的sql connector在sql-cli中运行问题
hi all, 我自定义了一个sql connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下 2020-07-14 10:36:29,148 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement. at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] Caused by: scala.MatchError: null at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.Option.map(Option.scala:146) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341) ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at