ElasticsearchSink 设置es 主分片数
hi, flink streaming(版本1.12.5) 写es的时候ElasticsearchSink.Builder发现没有设置配置的地方,比如要想设置 number_of_shards。哪位大佬知道,请教一下! ElasticsearchSink.BuilderesSinkBuilder=newElasticsearchSink.Builder<>(httpHosts,newElasticsearchSinkFunction(){publicIndexRequestcreateIndexRequest(Stringelement){Mapjson=newHashMap<>();json.put("data",element);returnRequests.indexRequest().index("my-index").type("my-type").source(json);}@Overridepublicvoidprocess(Stringelement,RuntimeContextctx,RequestIndexerindexer){indexer.add(createIndexRequest(element));}}) | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制
回复: Re: 怎样从flink执行计划json生成StreamGraph?
Sql作业好像不支持修改每个算子并行度吧,修改并行度需要从头开始重新生成JobGraph提交作业。 Json主要是贴到Plan Visualizer 开发和调试用。 https://flink.apache.org/visualizer/ 从 Windows 版邮件发送 发件人: yidan zhao 发送时间: 2022年11月30日 10:12 收件人: user-zh@flink.apache.org 主题: Re: Re: 怎样从flink执行计划json生成StreamGraph? 好吧,sql我具体不了解,我用的stream api比较多,我了解是stream api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。 casel.chen 于2022年11月30日周三 00:16写道: > > 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗? > > > > > > > > > > > > > > > > > > 在 2022-11-29 10:07:40,"yidan zhao" 写道: > >并不需要从执行计划json生成streamGraph呀~ > >streamGraph提交之前直接转jobGraph。 > > > >casel.chen 于2022年11月28日周一 08:53写道: > >> > >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
Re: Re: 怎样从flink执行计划json生成StreamGraph?
好吧,sql我具体不了解,我用的stream api比较多,我了解是stream api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。 casel.chen 于2022年11月30日周三 00:16写道: > > 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗? > > > > > > > > > > > > > > > > > > 在 2022-11-29 10:07:40,"yidan zhao" 写道: > >并不需要从执行计划json生成streamGraph呀~ > >streamGraph提交之前直接转jobGraph。 > > > >casel.chen 于2022年11月28日周一 08:53写道: > >> > >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
Re:Re: 怎样从flink执行计划json生成StreamGraph?
如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗? 在 2022-11-29 10:07:40,"yidan zhao" 写道: >并不需要从执行计划json生成streamGraph呀~ >streamGraph提交之前直接转jobGraph。 > >casel.chen 于2022年11月28日周一 08:53写道: >> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
> On Nov 4, 2022, at 2:34 PM, 左岩 <13520871...@163.com> wrote: > > tenv.executeSql("xxx); > env.execute(); 这样使用是不对的,你可以看下这两个方法的java doc 祝好, Leonard
Re: flink sql接cdc数据源按最新数据统计问题
> On Nov 29, 2022, at 8:32 AM, casel.chen wrote: > > 业务需求是mysql订单表按天按供应商实时统计交易金额,订单表会发生修改和删除,用flink > sql要如何实现呢?开窗取最新一条记录再聚合吗?如果遇到delete记录会不会减去相应的price呢?试着写了如下flink sql不知道对不对 会的,可以看下flink sql相关的原理文章,百度/谷歌一搜一大把。 祝好 Leonard > > > select > s.biddate, > s.supplier, > sum(s.price) > from > ( >select > * >from > ( >select > biddate, > supplier, > price, > ROW_NUMBER() OVER ( >PARTITION BY biddate, >supplier >ORDER BY > bidtime DESC > ) as rownum >from > ( >select > bidtime, > date_format(bidtime, '-MM-dd-HH') as biddate, > supplier, > price >from > orders > ) > ) as t >where > t.rownum = 1 > ) as s > group by > s.biddate, > s.supplier > ; >