Re:Re: Flink实时写入hive异常
我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties 在 2020-04-01 14:49:41,"Jingsong Li" 写道: >Hi, > >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > >Best, >Jingsong Lee > >On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > >> Hi, >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 >> cc @Jingsong Li @Jark Wu >> >> >> >> >> org.apache.flink.table.api.TableException: Stream Tables can only be >> emitted by AppendStreamTableSink, RetractStreamTableSink, or >> UpsertStreamTableSink. >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) >> >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> >> at scala.collection.Iterator.foreach(Iterator.scala:937) >> >> at scala.collection.Iterator.foreach$(Iterator.scala:937) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) >> >> at scala.collection.IterableLike.foreach(IterableLike.scala:70) >> >> at scala.collection.IterableLike.foreach$(IterableLike.scala:69) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> at >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) >> >> at >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j > > > >-- >Best, Jingsong Lee
Re: Flink实时写入hive异常
Hi, 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee On Wed, Apr 1, 2020 at 2:32 PM sunfulin wrote: > Hi, > 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into > xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 > cc @Jingsong Li @Jark Wu > > > > > org.apache.flink.table.api.TableException: Stream Tables can only be > emitted by AppendStreamTableSink, RetractStreamTableSink, or > UpsertStreamTableSink. > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > at scala.collection.Iterator.foreach(Iterator.scala:937) > > at scala.collection.Iterator.foreach$(Iterator.scala:937) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) > > at > com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j -- Best, Jingsong Lee
Flink实时写入hive异常
Hi, 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。 cc @Jingsong Li @Jark Wu org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87) at com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner
大家好, 正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化 器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再 针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多 现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布 的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。 因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之 前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner 做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner 有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到 您的反馈之后,我们有足够的时间进行修复和完善。 希望听到您宝贵的声音和意见,谢谢。 Best, Kurt
Re: 回复: ProcessWindowFunction中如何有效清除state呢
Hi 我觉得你的整个程序能从没有checkpoint开始跑就很奇怪,你们的 value state descriptor里面没有定义default value,那么调用#value() 接口返回的就是null,所以第一次调用 #update 时候还从state里面取值,最后还能跑通就很奇怪。 我建议本地在IDE里面debug看一下吧,可以把clear的条件改一下,不要弄成隔天才清理,可以让本地可以复现问题。 祝好 唐云 From: 守护 <346531...@qq.com> Sent: Tuesday, March 31, 2020 16:31 To: user-zh Subject: 回复: ProcessWindowFunction中如何有效清除state呢 感谢您回复 代码中if(stateDate.equals("") || stateDate.equals(date))的判断逻辑确实能走到pv_st.clear()中,1.最后输出结果时发现pv_st中的状态没有清空,还是累加计算,2.state.clear() 之后,再次获取时,返回值会是null,代码片段里面确实没有对null值的校验,而是直接更新值pv_st.update(pv_st.value() + c_st),是和这个能有关系吗 -- 原始邮件 -- 发件人: "Yun Tang"
flink 1.10 catalog保存到hive
hi: 我们这面想使用hive来存储flink catalog数据,那么在元数据保存删除的时候怎么来校验是否拥有hive元数据操作权限哪
Re: Question about the flink 1.6 memory config
The container cut-off accounts for not only metaspace, but also native memory footprint such as thread stack, code cache, compressed class space. If you run streaming jobs with rocksdb state backend, it also accounts for the rocksdb memory usage. The consequence of less cut-off depends on your environment and workloads. For standalone clusters, the cut-off will not take any effect. For containerized environments, depending on Yarn/Mesos configurations your container may or may not get killed due to exceeding the container memory. Thank you~ Xintong Song On Tue, Mar 31, 2020 at 5:34 PM LakeShen wrote: > Hi community, > > Now I am optimizing the flink 1.6 task memory configuration. I see the > source code, at first, the flink task config the cut-off memory, cut-off > memory = Math.max(600,containerized.heap-cutoff-ratio * TaskManager > Memory), containerized.heap-cutoff-ratio default value is 0.25. For > example, if TaskManager Memory is 4G, cut-off memory is 1 G. > > However, I set the taskmanager's gc.log, I find the metaspace only used > 60 MB. I personally feel that the memory configuration of cut-off is a > little too large. Can this cut-off memory configuration be reduced, like > making the containerized.heap-cutoff-ratio be 0.15. > Is there any problem for this config? > > I am looking forward to your reply. > > Best wishes, > LakeShen >
flink dashboard 有没有好的账号认证方式
各位好: 我现在在搭建flink平台的时候,发现dashboard没有账号权限的控制,只要知道了地址就可以访问并提交job,感觉这个风险比较大,大家在生产环境是如何解决这个问题的? 我看到网上说通过Nginx做认证控制, 我感觉这个有点不太优雅,flink官方有没有认证相关的插件?有没有稍微优雅一点的解决方案? 期待大家的回复
Question about the flink 1.6 memory config
Hi community, Now I am optimizing the flink 1.6 task memory configuration. I see the source code, at first, the flink task config the cut-off memory, cut-off memory = Math.max(600,containerized.heap-cutoff-ratio * TaskManager Memory), containerized.heap-cutoff-ratio default value is 0.25. For example, if TaskManager Memory is 4G, cut-off memory is 1 G. However, I set the taskmanager's gc.log, I find the metaspace only used 60 MB. I personally feel that the memory configuration of cut-off is a little too large. Can this cut-off memory configuration be reduced, like making the containerized.heap-cutoff-ratio be 0.15. Is there any problem for this config? I am looking forward to your reply. Best wishes, LakeShen
?????? ProcessWindowFunction??????????????state??
?? ??if(stateDate.equals("") || stateDate.equals(date))pv_st.clear()1.??pv_st2.state.clear() nullnullpv_st.update(pv_st.value() + c_st) -- -- ??: "Yun Tang"
Re: ProcessWindowFunction中如何有效清除state呢
Hi 从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。 其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。 祝好 唐云 From: 守护 <346531...@qq.com> Sent: Tuesday, March 31, 2020 12:33 To: user-zh Subject: ProcessWindowFunction中如何有效清除state呢 各位好: --版本 FLINK 1.10.0 ON YARN --过程 1.定义一个 .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口 2.定义一个new Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算, --问题 在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢? --部分代码 .window(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) .process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] { private var pv_st: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long])) } override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = { var c_st = 0 val elementsIterator = elements.iterator // 遍历窗口数据,获取唯一word while (elementsIterator.hasNext) { val ac_name = elementsIterator.next()._2 if(!ac_name.isEmpty && ac_name.equals("listentime")){ c_st +=1 } } val time: Date = new Date() val dateFormat: SimpleDateFormat = new SimpleDateFormat("-MM-dd") val date = dateFormat.format(time) // add current pv_st.update(pv_st.value() + c_st) var jsonStr = ""+key.getField(0)+"_"+date+"&" // json格式开始 jsonStr += "{"+ "\"yesterday_foreground_play_pv\":\""+pv_st.value()+ "\"}"; //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加 if(stateDate.equals("") || stateDate.equals(date)){ stateDate=date out.collect(jsonStr) }else{ out.collect(jsonStr) pv_st.clear() stateDate=date } } })
Re: keyby的乱序处理
HI,再次补充一下我的场景,如下图所示: 1、kafka TopicA的Partiton1的数据包含3个user的数据 2、flink在对该分区生成了w1、w2、w3...的watermark 问题来了: 1、w1、w2、w3...的watermark只能保证user1、user2、user3的整体数据的有序处理对吗? 2、在对user1、user2、user3进行keyby后,w1、w2、w3...的watermark能保证user1或者user2或者user3的有序处理吗? 期待大神的回复! [image: image.png] jun su 于2020年3月31日周二 下午1:10写道: > hi, > keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱, > 不会影响后续的窗口触发 > > tingli ke 于2020年3月31日周二 上午9:54写道: > > > 您好, > > 针对您的回复,现在的场景是这样子的 > > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton > > 发射 watermark; > > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗? > > 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark? > > > > Jimmy Wong 于2020年3月30日周一 下午9:13写道: > > > > > Hi, > > > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy > > > 或其他分配策略,可能导致数据更大的延迟(EventTime)。 > > > > > > > > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。 > > > > > > > > > | | > > > Jimmy Wong > > > | > > > | > > > wangzmk...@163.com > > > | > > > 签名由网易邮箱大师定制 > > > > > > > > > 在2020年03月30日 20:58,tingli ke 写道: > > > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗 > > > > > > > > -- > Best, > Jun Su >
回复: Flink SQL中动态嵌套字段如何定义DDL
Hi, 嗯,之前尝试了一下,没有写属性,所以没有值显示,还以为是不支持MAP。 使用的时候data[‘a’]就好了 Best, Xinghalo 在2020年03月31日 14:59,Benchao Li 写道: 可以尝试把data字段定义为一个map类型。 111 于2020年3月31日周二 下午2:56写道: Hi, 我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如: {database:a, table: b, type:update, data:{a:1,b:2,c:3}} {database:a, table: c, type:update, data:{c:1,d:2}} 请问这种类型该如何定义DDL? Best, Xinghalo -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: [Third-party Tool] Flink memory calculator
Hi, there. In the latest version, the calculator supports dynamic options. You could append all your dynamic options to the end of "bin/calculator.sh [-h]". Since "-tm" will be deprecated eventually, please replace it with "-Dtaskmanager.memory.process.size=". Best, Yangze Guo On Mon, Mar 30, 2020 at 12:57 PM Xintong Song wrote: > > Hi Jeff, > > I think the purpose of this tool it to allow users play with the memory > configurations without needing to actually deploy the Flink cluster or even > have a job. For sanity checks, we currently have them in the start-up scripts > (for standalone clusters) and resource managers (on K8s/Yarn/Mesos). > > I think it makes sense do the checks earlier, i.e. on the client side. But > I'm not sure if JobListener is the right place. IIUC, JobListener is invoked > before submitting a specific job, while the mentioned checks validate Flink's > cluster level configurations. It might be okay for a job cluster, but does > not cover the scenarios of session clusters. > > Thank you~ > > Xintong Song > > > > On Mon, Mar 30, 2020 at 12:03 PM Yangze Guo wrote: >> >> Thanks for your feedbacks, @Xintong and @Jeff. >> >> @Jeff >> I think it would always be good to leverage exist logic in Flink, such >> as JobListener. However, this calculator does not only target to check >> the conflict, it also targets to provide the calculating result to >> user before the job is actually deployed in case there is any >> unexpected configuration. It's a good point that we need to parse the >> dynamic configs. I prefer to parse the dynamic configs and cli >> commands in bash instead of adding hook in JobListener. >> >> Best, >> Yangze Guo >> >> On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang wrote: >> > >> > Hi Yangze, >> > >> > Does this tool just parse the configuration in flink-conf.yaml ? Maybe it >> > could be done in JobListener [1] (we should enhance it via adding hook >> > before job submission), so that it could all the cases (e.g. parameters >> > coming from command line) >> > >> > [1] >> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35 >> > >> > >> > Yangze Guo 于2020年3月30日周一 上午9:40写道: >> >> >> >> Hi, Yun, >> >> >> >> I'm sorry that it currently could not handle it. But I think it is a >> >> really good idea and that feature would be added to the next version. >> >> >> >> Best, >> >> Yangze Guo >> >> >> >> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang wrote: >> >> > >> >> > Very interesting and convenient tool, just a quick question: could this >> >> > tool also handle deployment cluster commands like "-tm" mixed with >> >> > configuration in `flink-conf.yaml` ? >> >> > >> >> > Best >> >> > Yun Tang >> >> > >> >> > From: Yangze Guo >> >> > Sent: Friday, March 27, 2020 18:00 >> >> > To: user ; user-zh@flink.apache.org >> >> > >> >> > Subject: [Third-party Tool] Flink memory calculator >> >> > >> >> > Hi, there. >> >> > >> >> > In release-1.10, the memory setup of task managers has changed a lot. >> >> > I would like to provide here a third-party tool to simulate and get >> >> > the calculation result of Flink's memory configuration. >> >> > >> >> > Although there is already a detailed setup guide[1] and migration >> >> > guide[2] officially, the calculator could further allow users to: >> >> > - Verify if there is any conflict in their configuration. The >> >> > calculator is more lightweight than starting a Flink cluster, >> >> > especially when running Flink on Yarn/Kubernetes. User could make sure >> >> > their configuration is correct locally before deploying it to external >> >> > resource managers. >> >> > - Get all of the memory configurations before deploying. User may set >> >> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size. >> >> > But they also want to know the total memory consumption of Flink. With >> >> > this tool, users could get all of the memory configurations they are >> >> > interested in. If anything is unexpected, they would not need to >> >> > re-deploy a Flink cluster. >> >> > >> >> > The repo link of this tool is >> >> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the >> >> > BashJavaUtils.jar of Flink and ensures the calculation result is >> >> > exactly the same as your Flink dist. For more details, please take a >> >> > look at the README. >> >> > >> >> > Any feedback or suggestion is welcomed! >> >> > >> >> > [1] >> >> > https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html >> >> > [2] >> >> > https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html >> >> > >> >> > Best, >> >> > Yangze Guo >> > >> > >> > >> > -- >> > Best Regards >> > >> > Jeff Zhang
Re: Flink SQL中动态嵌套字段如何定义DDL
可以尝试把data字段定义为一个map类型。 111 于2020年3月31日周二 下午2:56写道: > Hi, > 我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如: > {database:a, table: b, type:update, data:{a:1,b:2,c:3}} > {database:a, table: c, type:update, data:{c:1,d:2}} > 请问这种类型该如何定义DDL? > > > Best, > Xinghalo > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn