Re:Re: Flink实时写入hive异常

2020-03-31 文章 sunfulin



我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee


Re: Flink实时写入hive异常

2020-03-31 文章 Jingsong Li
Hi,

异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:

> Hi,
> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> cc  @Jingsong Li  @Jark Wu
>
>
>
>
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
>  at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
>  at
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
>  at
> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j



-- 
Best, Jingsong Lee


Flink实时写入hive异常

2020-03-31 文章 sunfulin
Hi, 
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into 
xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu 




org.apache.flink.table.api.TableException: Stream Tables can only be emitted by 
AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

 at scala.collection.Iterator.foreach(Iterator.scala:937)

 at scala.collection.Iterator.foreach$(Iterator.scala:937)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

 at scala.collection.IterableLike.foreach(IterableLike.scala:70)

 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at scala.collection.TraversableLike.map(TraversableLike.scala:233)

 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

 at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

 at 
com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j

【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-03-31 文章 Kurt Young
大家好,

正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。

因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
您的反馈之后,我们有足够的时间进行修复和完善。

希望听到您宝贵的声音和意见,谢谢。

Best,
Kurt


Re: 回复: ProcessWindowFunction中如何有效清除state呢

2020-03-31 文章 Yun Tang
Hi

我觉得你的整个程序能从没有checkpoint开始跑就很奇怪,你们的 value state descriptor里面没有定义default 
value,那么调用#value() 接口返回的就是null,所以第一次调用 #update 时候还从state里面取值,最后还能跑通就很奇怪。

我建议本地在IDE里面debug看一下吧,可以把clear的条件改一下,不要弄成隔天才清理,可以让本地可以复现问题。

祝好
唐云

From: 守护 <346531...@qq.com>
Sent: Tuesday, March 31, 2020 16:31
To: user-zh 
Subject: 回复: ProcessWindowFunction中如何有效清除state呢

感谢您回复


代码中if(stateDate.equals("") || 
stateDate.equals(date))的判断逻辑确实能走到pv_st.clear()中,1.最后输出结果时发现pv_st中的状态没有清空,还是累加计算,2.state.clear()
 之后,再次获取时,返回值会是null,代码片段里面确实没有对null值的校验,而是直接更新值pv_st.update(pv_st.value() + 
c_st),是和这个能有关系吗








-- 原始邮件 --
发件人: "Yun Tang"

flink 1.10 catalog保存到hive

2020-03-31 文章 宇张
hi:
我们这面想使用hive来存储flink catalog数据,那么在元数据保存删除的时候怎么来校验是否拥有hive元数据操作权限哪


Re: Question about the flink 1.6 memory config

2020-03-31 文章 Xintong Song
The container cut-off accounts for not only metaspace, but also native
memory footprint such as thread stack, code cache, compressed class space.
If you run streaming jobs with rocksdb state backend, it also accounts for
the rocksdb memory usage.

The consequence of less cut-off depends on your environment and workloads.
For standalone clusters, the cut-off will not take any effect. For
containerized environments, depending on Yarn/Mesos configurations your
container may or may not get killed due to exceeding the container memory.

Thank you~

Xintong Song



On Tue, Mar 31, 2020 at 5:34 PM LakeShen  wrote:

> Hi community,
>
> Now I am optimizing the flink 1.6 task memory configuration. I see the
> source code, at first, the flink task config the cut-off memory, cut-off
> memory = Math.max(600,containerized.heap-cutoff-ratio  * TaskManager
> Memory), containerized.heap-cutoff-ratio default value is 0.25. For
> example, if TaskManager Memory is 4G, cut-off memory is 1 G.
>
> However, I set the taskmanager's gc.log, I find the  metaspace only used
> 60 MB. I personally feel that the memory configuration of cut-off is a
> little too large. Can this cut-off memory configuration be reduced, like
> making the containerized.heap-cutoff-ratio be 0.15.
> Is there any problem for this config?
>
> I am looking forward to your reply.
>
> Best wishes,
> LakeShen
>


flink dashboard 有没有好的账号认证方式

2020-03-31 文章 陈伟
各位好:
我现在在搭建flink平台的时候,发现dashboard没有账号权限的控制,只要知道了地址就可以访问并提交job,感觉这个风险比较大,大家在生产环境是如何解决这个问题的?
我看到网上说通过Nginx做认证控制, 我感觉这个有点不太优雅,flink官方有没有认证相关的插件?有没有稍微优雅一点的解决方案?
期待大家的回复



Question about the flink 1.6 memory config

2020-03-31 文章 LakeShen
Hi community,

Now I am optimizing the flink 1.6 task memory configuration. I see the
source code, at first, the flink task config the cut-off memory, cut-off
memory = Math.max(600,containerized.heap-cutoff-ratio  * TaskManager
Memory), containerized.heap-cutoff-ratio default value is 0.25. For
example, if TaskManager Memory is 4G, cut-off memory is 1 G.

However, I set the taskmanager's gc.log, I find the  metaspace only used 60
MB. I personally feel that the memory configuration of cut-off is a little
too large. Can this cut-off memory configuration be reduced, like making
the containerized.heap-cutoff-ratio be 0.15.
Is there any problem for this config?

I am looking forward to your reply.

Best wishes,
LakeShen


?????? ProcessWindowFunction??????????????state??

2020-03-31 文章 ????
??


??if(stateDate.equals("") || 
stateDate.equals(date))pv_st.clear()1.??pv_st2.state.clear()
 
nullnullpv_st.update(pv_st.value()
 + c_st)








--  --
??: "Yun Tang"

Re: ProcessWindowFunction中如何有效清除state呢

2020-03-31 文章 Yun Tang
Hi

从代码看 if(stateDate.equals("") || stateDate.equals(date)) 
无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。
其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。

祝好
唐云

From: 守护 <346531...@qq.com>
Sent: Tuesday, March 31, 2020 12:33
To: user-zh 
Subject: ProcessWindowFunction中如何有效清除state呢

各位好:


--版本
FLINK 1.10.0 ON YARN


--过程
1.定义一个  .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口
2.定义一个new 
Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算,
--问题
 在 new 
ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢?



--部分代码


  .window(TumblingProcessingTimeWindows.of(Time.days(1)))   
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, 
TimeWindow] {


        private var pv_st: ValueState[Long] = _  
         


        override def open(parameters: Configuration): Unit 
= {
         pv_st = getRuntimeContext.getState[Long](new 
ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
        }


       override def process(key: Tuple, context: Context, 
elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
          var c_st = 0
   
          val elementsIterator = elements.iterator
          // 遍历窗口数据,获取唯一word
          while (elementsIterator.hasNext) {
            val ac_name = 
elementsIterator.next()._2
            if(!ac_name.isEmpty && 
ac_name.equals("listentime")){
              c_st +=1
            }
          }
          val time: Date = new Date()
          val dateFormat: SimpleDateFormat = new 
SimpleDateFormat("-MM-dd")
          val date = dateFormat.format(time)


          // add current
         pv_st.update(pv_st.value() + c_st)


          var jsonStr = 
""+key.getField(0)+"_"+date+"&" // json格式开始
          jsonStr += "{"+
                    
"\"yesterday_foreground_play_pv\":\""+pv_st.value()+
                    "\"}";




          //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加
          if(stateDate.equals("") || 
stateDate.equals(date)){
            stateDate=date
            out.collect(jsonStr)
          }else{
            out.collect(jsonStr)
            pv_st.clear()
            stateDate=date
          }


        }


      })


Re: keyby的乱序处理

2020-03-31 文章 tingli ke
HI,再次补充一下我的场景,如下图所示:
1、kafka TopicA的Partiton1的数据包含3个user的数据
2、flink在对该分区生成了w1、w2、w3...的watermark

问题来了:
1、w1、w2、w3...的watermark只能保证user1、user2、user3的整体数据的有序处理对吗?
2、在对user1、user2、user3进行keyby后,w1、w2、w3...的watermark能保证user1或者user2或者user3的有序处理吗?

期待大神的回复!
[image: image.png]

jun su  于2020年3月31日周二 下午1:10写道:

> hi,
> keyby后的watermark应该是上游多个线程中最小的watermark , 所以数据虽然可能乱序, 但是watermark并不会乱,
> 不会影响后续的窗口触发
>
> tingli ke  于2020年3月31日周二 上午9:54写道:
>
> > 您好,
> > 针对您的回复,现在的场景是这样子的
> > 1、kafka存在多个partition,针对多个Partiton,flink watermark assiger会对每个Partiton
> > 发射 watermark;
> > 2、在第一个前提下,水位已经设置好了,还可以在keyby之后在次设置watermark吗?
> > 3、是否存在可以不用经过第一个前提的方案,直接在keyby之后设置watermark?
> >
> > Jimmy Wong  于2020年3月30日周一 下午9:13写道:
> >
> > > Hi,
> > > watermark 可以在 keyBy 后分配,但是最好紧跟 SourceFunction。经过 KeyBy
> > > 或其他分配策略,可能导致数据更大的延迟(EventTime)。
> > >
> > >
> > > “想做key化的乱序处理” 这句没太理解,麻烦解释下。
> > >
> > >
> > > | |
> > > Jimmy Wong
> > > |
> > > |
> > > wangzmk...@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年03月30日 20:58,tingli ke 写道:
> > > 请教一个问题:kafka-per-partition 的watermark的分配,可以在keyby之后分配吗,想做key化的乱序处理能支持吗
> > >
> >
>
>
> --
> Best,
> Jun Su
>


回复: Flink SQL中动态嵌套字段如何定义DDL

2020-03-31 文章 111


Hi,
嗯,之前尝试了一下,没有写属性,所以没有值显示,还以为是不支持MAP。
使用的时候data[‘a’]就好了
Best,
Xinghalo
在2020年03月31日 14:59,Benchao Li 写道:
可以尝试把data字段定义为一个map类型。

111  于2020年3月31日周二 下午2:56写道:

Hi,
我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如:
{database:a, table: b, type:update, data:{a:1,b:2,c:3}}
{database:a, table: c, type:update, data:{c:1,d:2}}
请问这种类型该如何定义DDL?


Best,
Xinghalo



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: [Third-party Tool] Flink memory calculator

2020-03-31 文章 Yangze Guo
Hi, there.

In the latest version, the calculator supports dynamic options. You
could append all your dynamic options to the end of "bin/calculator.sh
[-h]".
Since "-tm" will be deprecated eventually, please replace it with
"-Dtaskmanager.memory.process.size=".

Best,
Yangze Guo

On Mon, Mar 30, 2020 at 12:57 PM Xintong Song  wrote:
>
> Hi Jeff,
>
> I think the purpose of this tool it to allow users play with the memory 
> configurations without needing to actually deploy the Flink cluster or even 
> have a job. For sanity checks, we currently have them in the start-up scripts 
> (for standalone clusters) and resource managers (on K8s/Yarn/Mesos).
>
> I think it makes sense do the checks earlier, i.e. on the client side. But 
> I'm not sure if JobListener is the right place. IIUC, JobListener is invoked 
> before submitting a specific job, while the mentioned checks validate Flink's 
> cluster level configurations. It might be okay for a job cluster, but does 
> not cover the scenarios of session clusters.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Mar 30, 2020 at 12:03 PM Yangze Guo  wrote:
>>
>> Thanks for your feedbacks, @Xintong and @Jeff.
>>
>> @Jeff
>> I think it would always be good to leverage exist logic in Flink, such
>> as JobListener. However, this calculator does not only target to check
>> the conflict, it also targets to provide the calculating result to
>> user before the job is actually deployed in case there is any
>> unexpected configuration. It's a good point that we need to parse the
>> dynamic configs. I prefer to parse the dynamic configs and cli
>> commands in bash instead of adding hook in JobListener.
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang  wrote:
>> >
>> > Hi Yangze,
>> >
>> > Does this tool just parse the configuration in flink-conf.yaml ?  Maybe it 
>> > could be done in JobListener [1] (we should enhance it via adding hook 
>> > before job submission), so that it could all the cases (e.g. parameters 
>> > coming from command line)
>> >
>> > [1] 
>> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35
>> >
>> >
>> > Yangze Guo  于2020年3月30日周一 上午9:40写道:
>> >>
>> >> Hi, Yun,
>> >>
>> >> I'm sorry that it currently could not handle it. But I think it is a
>> >> really good idea and that feature would be added to the next version.
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
>> >> >
>> >> > Very interesting and convenient tool, just a quick question: could this 
>> >> > tool also handle deployment cluster commands like "-tm" mixed with 
>> >> > configuration in `flink-conf.yaml` ?
>> >> >
>> >> > Best
>> >> > Yun Tang
>> >> > 
>> >> > From: Yangze Guo 
>> >> > Sent: Friday, March 27, 2020 18:00
>> >> > To: user ; user-zh@flink.apache.org 
>> >> > 
>> >> > Subject: [Third-party Tool] Flink memory calculator
>> >> >
>> >> > Hi, there.
>> >> >
>> >> > In release-1.10, the memory setup of task managers has changed a lot.
>> >> > I would like to provide here a third-party tool to simulate and get
>> >> > the calculation result of Flink's memory configuration.
>> >> >
>> >> >  Although there is already a detailed setup guide[1] and migration
>> >> > guide[2] officially, the calculator could further allow users to:
>> >> > - Verify if there is any conflict in their configuration. The
>> >> > calculator is more lightweight than starting a Flink cluster,
>> >> > especially when running Flink on Yarn/Kubernetes. User could make sure
>> >> > their configuration is correct locally before deploying it to external
>> >> > resource managers.
>> >> > - Get all of the memory configurations before deploying. User may set
>> >> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
>> >> > But they also want to know the total memory consumption of Flink. With
>> >> > this tool, users could get all of the memory configurations they are
>> >> > interested in. If anything is unexpected, they would not need to
>> >> > re-deploy a Flink cluster.
>> >> >
>> >> > The repo link of this tool is
>> >> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
>> >> > BashJavaUtils.jar of Flink and ensures the calculation result is
>> >> > exactly the same as your Flink dist. For more details, please take a
>> >> > look at the README.
>> >> >
>> >> > Any feedback or suggestion is welcomed!
>> >> >
>> >> > [1] 
>> >> > https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
>> >> > [2] 
>> >> > https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
>> >> >
>> >> > Best,
>> >> > Yangze Guo
>> >
>> >
>> >
>> > --
>> > Best Regards
>> >
>> > Jeff Zhang


Re: Flink SQL中动态嵌套字段如何定义DDL

2020-03-31 文章 Benchao Li
可以尝试把data字段定义为一个map类型。

111  于2020年3月31日周二 下午2:56写道:

> Hi,
> 我们在使用streamsets作为CDC工具,输出到kafka中的内容是嵌套多变的类型,如:
> {database:a, table: b, type:update, data:{a:1,b:2,c:3}}
> {database:a, table: c, type:update, data:{c:1,d:2}}
> 请问这种类型该如何定义DDL?
>
>
> Best,
> Xinghalo
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn