Hi, Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
你可以描述下详细堆栈、应用场景、SQL吗? Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:56 PM sunfulin <[email protected]> wrote: > > 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh > > org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not > enough rules to produce a node with desired properties > > > > > > > 在 2020-04-01 14:49:41,"Jingsong Li" <[email protected]> 写道: > >Hi, > > > >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] > > > >[1] > >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > >Best, > >Jingsong Lee > > > >On Wed, Apr 1, 2020 at 2:32 PM sunfulin <[email protected]> wrote: > > > >> Hi, > >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into > >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 > >> cc @Jingsong Li @Jark Wu > >> > >> > >> > >> > >> org.apache.flink.table.api.TableException: Stream Tables can only be > >> emitted by AppendStreamTableSink, RetractStreamTableSink, or > >> UpsertStreamTableSink. > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >> > >> at > >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > >> > >> at > >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > >> > >> at > >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > >> > >> at scala.collection.Iterator.foreach(Iterator.scala:937) > >> > >> at scala.collection.Iterator.foreach$(Iterator.scala:937) > >> > >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > >> > >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) > >> > >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > >> > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> > >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) > >> > >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > >> > >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> > >> at > >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > >> > >> at > >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > >> > >> at > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >> > >> at > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > >> > >> at > >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) > >> > >> at > >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j > > > > > > > >-- > >Best, Jingsong Lee > > > > > -- Best, Jingsong Lee
