Hi, 
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into 
xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu 




org.apache.flink.table.api.TableException: Stream Tables can only be emitted by 
AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

 at scala.collection.Iterator.foreach(Iterator.scala:937)

 at scala.collection.Iterator.foreach$(Iterator.scala:937)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

 at scala.collection.IterableLike.foreach(IterableLike.scala:70)

 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at scala.collection.TraversableLike.map(TraversableLike.scala:233)

 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

 at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

 at 
com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j

回复