ElasticsearchSink 设置es 主分片数

2022-11-29 文章 allanqinjy


hi,
flink streaming(版本1.12.5) 写es的时候ElasticsearchSink.Builder发现没有设置配置的地方,比如要想设置 
number_of_shards。哪位大佬知道,请教一下!


ElasticsearchSink.BuilderesSinkBuilder=newElasticsearchSink.Builder<>(httpHosts,newElasticsearchSinkFunction(){publicIndexRequestcreateIndexRequest(Stringelement){Mapjson=newHashMap<>();json.put("data",element);returnRequests.indexRequest().index("my-index").type("my-type").source(json);}@Overridepublicvoidprocess(Stringelement,RuntimeContextctx,RequestIndexerindexer){indexer.add(createIndexRequest(element));}})
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



回复: Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 文章 仙路尽头谁为峰
Sql作业好像不支持修改每个算子并行度吧,修改并行度需要从头开始重新生成JobGraph提交作业。
Json主要是贴到Plan Visualizer 开发和调试用。
https://flink.apache.org/visualizer/
从 Windows 版邮件发送

发件人: yidan zhao
发送时间: 2022年11月30日 10:12
收件人: user-zh@flink.apache.org
主题: Re: Re: 怎样从flink执行计划json生成StreamGraph?

好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。

casel.chen  于2022年11月30日周三 00:16写道:
>
> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
> >并不需要从执行计划json生成streamGraph呀~
> >streamGraph提交之前直接转jobGraph。
> >
> >casel.chen  于2022年11月28日周一 08:53写道:
> >>
> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教



Re: Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 文章 yidan zhao
好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。

casel.chen  于2022年11月30日周三 00:16写道:
>
> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
> >并不需要从执行计划json生成streamGraph呀~
> >streamGraph提交之前直接转jobGraph。
> >
> >casel.chen  于2022年11月28日周一 08:53写道:
> >>
> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


Re:Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 文章 casel.chen
如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?

















在 2022-11-29 10:07:40,"yidan zhao"  写道:
>并不需要从执行计划json生成streamGraph呀~
>streamGraph提交之前直接转jobGraph。
>
>casel.chen  于2022年11月28日周一 08:53写道:
>>
>> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-29 文章 Leonard Xu


> On Nov 4, 2022, at 2:34 PM, 左岩 <13520871...@163.com> wrote:
> 
> tenv.executeSql("xxx);
> env.execute();


这样使用是不对的,你可以看下这两个方法的java doc

祝好,
Leonard

Re: flink sql接cdc数据源按最新数据统计问题

2022-11-29 文章 Leonard Xu


> On Nov 29, 2022, at 8:32 AM, casel.chen  wrote:
> 
> 业务需求是mysql订单表按天按供应商实时统计交易金额,订单表会发生修改和删除,用flink 
> sql要如何实现呢?开窗取最新一条记录再聚合吗?如果遇到delete记录会不会减去相应的price呢?试着写了如下flink sql不知道对不对


会的,可以看下flink sql相关的原理文章,百度/谷歌一搜一大把。

祝好
Leonard


> 
> 
> select 
>  s.biddate, 
>  s.supplier, 
>  sum(s.price) 
> from 
>  (
>select 
>  * 
>from 
>  (
>select 
>  biddate, 
>  supplier, 
>  price, 
>  ROW_NUMBER() OVER (
>PARTITION BY biddate, 
>supplier 
>ORDER BY 
>  bidtime DESC
>  ) as rownum 
>from 
>  (
>select 
>  bidtime, 
>  date_format(bidtime, '-MM-dd-HH') as biddate, 
>  supplier, 
>  price 
>from 
>  orders
>  )
>  ) as t 
>where 
>  t.rownum = 1
>  ) as s 
> group by 
>  s.biddate, 
>  s.supplier
> ;
>