Re: Flink工程停止问题

2021-11-17 文章 zhisheng
老版本我们是自己通过重新修改页面 js 代码去掉的,

zhisheng  于2021年11月18日周四 上午11:44写道:

> web.cancel.enable: false
>
> web.cancel.enable 这个参数可以控制是否显示那个取消按钮
>
> Caizhi Weng  于2021年11月16日周二 下午3:53写道:
>
>> Hi!
>>
>> Flink 本身不自带安全机制,需要通过外部系统完成访问限制。
>>
>> 疾鹰击皓月 <1764232...@qq.com.invalid> 于2021年11月16日周二 下午2:57写道:
>>
>> > 您好
>> >
>> > Flink
>> >
>> WebUI的左上角有一个cancel按钮,通过按钮可以停止Flink工程。但这会导致一定的权限问题。我们希望只有特定人员可以停止Flink工程,请问有没有方法可以让那个停止按钮不生效或者不显示呢?
>>
>


Re: Flink工程停止问题

2021-11-17 文章 zhisheng
web.cancel.enable: false

web.cancel.enable 这个参数可以控制是否显示那个取消按钮

Caizhi Weng  于2021年11月16日周二 下午3:53写道:

> Hi!
>
> Flink 本身不自带安全机制,需要通过外部系统完成访问限制。
>
> 疾鹰击皓月 <1764232...@qq.com.invalid> 于2021年11月16日周二 下午2:57写道:
>
> > 您好
> >
> > Flink
> >
> WebUI的左上角有一个cancel按钮,通过按钮可以停止Flink工程。但这会导致一定的权限问题。我们希望只有特定人员可以停止Flink工程,请问有没有方法可以让那个停止按钮不生效或者不显示呢?
>


Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-04 文章 zhisheng


Daisy Tsang  于2021年11月3日周三 上午9:36写道:

> Hey everyone, we have a new two-part post published on the Apache Flink
> blog about the sort-based blocking shuffle implementation in Flink.  It
> covers benchmark results, design and implementation details, and more!  We
> hope you like it and welcome any sort of feedback on it. :)
>
>
> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>


Re: Re: 回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-11-04 文章 zhisheng
考虑 currentOffsets 吧

杨浩  于2021年10月27日周三 下午5:40写道:

> 明白这个逻辑,这个就导致consumer
> lag值不能反映真实情况,而很难监控系统延迟一个场景:业务状态很大,5分钟保存一次,QPS在1~100之间波动,那么需要配置延迟大于5*60*100来监控系统,这会导致监控非常不准确
> 在 2021-10-27 17:34:13,"Qingsheng Ren"  写道:
> >你好!
> >
> >如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和
> auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交
> offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在
> checkpoint 时提交位点。
> >
> >--
> >Best Regards,
> >
> >Qingsheng Ren
> >Email: renqs...@gmail.com
> >On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
> >> 请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
> >> 在 2021-10-25 21:58:28,"杨浩"  写道:
> >> > currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
> >> > 在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
> >> > > Hi!
> >> > >
> >> > > 这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取
> offset,可以通过
> >> > > metrics 读取,见 [1]。
> >> > >
> >> > > [1]
> >> > >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
> >> > >
> >> > > 杨浩  于2021年10月25日周一 上午10:20写道:
> >> > >
> >> > > >
> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度
>


Re: flink-yarn的pre-job模式

2021-11-04 文章 zhisheng
可以检查两个:

1、yarn 队列是否资源足够,如果不够可能是资源的问题

2、检查作业本身是否有包冲突?

Shuiqiang Chen  于2021年10月27日周三 上午10:04写道:

> 你好,
>
> 上传的图片无法加载。 这种情况是 yarn 无法提供拉起taskmanager,检查下yarn资源是否充足?
>
> 王健 <13166339...@163.com> 于2021年10月26日周二 下午7:50写道:
>
> > 您好:
> >   我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。
> >
> >  1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c
> > com.worktrans.flink.wj.ods.FlinkCDC01 /usr/local/flink-1.13.2/flink_x.jar
> >  提交正常,如图:
> >
> >  2.yarn 截图
> >
> >
> > 3.flink截图:
> >   现象:taskslot和taskmanager数量都为0,一直在申请
> >
> >
> >  4.最后结果:报错如下
> > 2021-10-25 16:17:49
> > java.util.concurrent.CompletionException:
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Slot request bulk is not fulfillable! Could not allocate the required
> slot
> > within slot request timeout
> > at
> >
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > at
> >
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > at
> >
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> > at
> >
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> > at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > at
> >
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)
> > at
> >
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)
> > at
> >
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> > at
> >
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> > at
> >
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by:
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Slot request bulk is not fulfillable! Could not allocate the required
> slot
> > within slot request timeout
> > at
> >
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> > ... 26 more
> > Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> > 30 ms
> > ... 27 more
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>


Re: Flink没有Operator级别的数据量Metrics

2021-11-04 文章 zhisheng
webui 有 operator 级别的,仔细看看

Ada Luna  于2021年10月26日周二 下午4:08写道:

> Web-UI中的就是Flink原生正常的Metrics,都是Task级别
>
> xiazhl  于2021年10月26日周二 下午2:31写道:
> >
> > web-ui里面有metrics
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> "user-zh"
>   <
> gfen...@gmail.com;
> > 发送时间:2021年10月26日(星期二) 中午1:55
> > 收件人:"user-zh" >
> > 主题:Flink没有Operator级别的数据量Metrics
> >
> >
> >
> >
> Flink只能看到Task级别的流入流出数据量,而没有Operator级别的。这个是出于性能考量吗?未来会加入一个开关,可以看到Operator级别的,方便debug吗?
>


Re: flink时态表:两个Hbase左关联有报错情况

2021-10-13 文章 zhisheng
是不是加了 'lookup.async' = 'true',当 rowkey 为 null 的时候会出现这个问题

https://issues.apache.org/jira/browse/FLINK-24528

Michael Ran  于2021年7月23日周五 上午10:44写道:

> java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar
> 在 2021-07-14 09:39:53,"xie_guo...@163.com"  写道:
> >您好,有关flinkSQL时态表左关联时遇到了问题。
> >具体场景:
> >
> 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!
> >
> >2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task
> --- 2021-07-14 09:22:20.596 WARN
> org.apache.flink.runtime.taskmanager.Task  ---
> LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
> joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code,
> data1, data2, p, $f4, code0, data]) -> Calc(select=[code,
> ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p,
> EXPR$4]) (3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING
> to FAILED with failure cause: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
> >at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
> >at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
> >at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >at java.lang.Thread.run(Thread.java:748)
> >Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
> >at
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
> >at
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
> >at LookupFunction$3.close(Unknown Source
> >
>
> >ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。
> >
> >
> >
> >Sincerely,
> >xie_guo...@163.com
>


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-08-02 文章 zhisheng
可以自定义 KafkaAppender,然后可以从 System.getProperty("log.file")
获取你要的信息维度数据,比如这个可以提取到作业 application id,container_id,是 jobmanager 还是
taskmanager,还可以根据配置只提取想要的级别日志,最后将打的依赖放到 lib 目录下即可

yujianbo <15205029...@163.com> 于2021年6月15日周二 下午7:34写道:

> 最新详细配置,可以看看我的博客:
> https://blog.csdn.net/weixin_44500374/article/details/117931457
> 如果对你有帮助帮忙点个赞~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 zhisheng
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧?

Wei JI10 季伟  于2021年6月28日周一 下午12:19写道:

> 您好,
> 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?
>
> 在 2021/6/28 上午11:59,“Jingsong Li” 写入:
>
> 注意:此封邮件来自于公司外部,请注意信息安全!
> Attention: This email comes from outside of the company, please pay
> attention to the information security!
>
> Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?
>
> BEST,
> Jingsong
>
> On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟  >
> wrote:
>
> > 您好,
> > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
> >
> >
>
> --
> Best, Jingsong Lee
>
>


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 zhisheng
使用的是 sql client 测试的 sql 吗?如果是的话,记得在 flink lib 目录下添加 flink-sql-parquet jar
包,然后重启集群和 sql client

Wei JI10 季伟  于2021年6月28日周一 上午9:35写道:

> 您好,
> 添加的parquet 依赖如下,不知道全不全
> 
> org.apache.flink
> flink-parquet_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.parquet
> parquet-avro
> 1.10.1
> 
> 
>
>
>


Re: 回复:flink 1.12如何实现window topN功能

2021-06-27 文章 zhisheng
可以将 1.13 的这个功能打在 flink 1.12 上面,然后引用你们新打的依赖

casel.chen  于2021年6月23日周三 下午12:08写道:

> -- source
> CREATE TABLE tmall_item(
> itemID VARCHAR,
> itemType VARCHAR,
> onSellTime TIMESTAMP(3),
> price DOUBLE,
> proctime AS PROCTIME(),
> WATERMARK FOR onSellTime AS onSellTime - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'filesystem' ,
> 'path' = 'file:///path/to/over-window.csv',
> 'format' = 'csv'
> );
>
> -- sink
> CREATE TABLE print_table (
> itemID VARCHAR,
> itemType VARCHAR,
> onSellTime TIMESTAMP(3),
> price DOUBLE
> ) WITH (
> 'connector' = 'print'
> );
>
> -- insert
> INSERT INTO print_table
> SELECT itemID,
> itemType,
> onSellTime,
> price
> FROM (
> SELECT itemID,
> itemType,
> onSellTime,
> price,
> ROW_NUMBER() OVER (
> PARTITION BY itemID, DATE_FORMAT(proctime, 'MMddHHmm')
> ORDER BY onSellTime DESC
> ) AS row_num
> FROM tmall_item
>  ) WHERE row_num = 1;
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-23 11:06:39,"杨光跃"  写道:
> >应该是这样吧
> >
> >
> >1. 第一步以主键group by 以及分时间窗口
> >SELECT 主键, TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, FROM
> source_event  group by TUMBLE(ts, INTERVAL '10' SECOND), 主键
> >
> >
> >2. 根据上一步的结果取top5
> >select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY
> 处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5
> >
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 10:58,casel.chen 写道:
> >你指的是TopN吗?
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
> >但我想知道window topN写法,跟这个还不一样。
> >我的需求是:
> >cdc场景同一个主键数据变更频繁,我想定义一个5秒处理时间窗口,在该窗口内取同一主键最新变更记录。用flink sql 1.12如何实现?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-06-23 10:18:01,"杨光跃"  写道:
> >
> >
> >Apache Flink 1.12 Documentation: Queries
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 10:09,casel.chen 写道:
> >请不要截图哦
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-06-23 09:47:46,"杨光跃"  写道:
> >
> >1.12也支持的
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 09:45,casel.chen 写道:
> >官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
>


Re: Flink 1.11版本LeaseRenewer线程不释放

2021-05-13 文章 zhisheng
你好,这个问题后来定位到问题了吗?

我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取
Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪

https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg

zilong xiao  于2020年12月8日周二 下午6:21写道:

> 作业数据流是 kafka -> flink ->
> http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~
>
> Paul Lam  于2020年12月8日周二 下午6:00写道:
>
> > Hi,
> >
> > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
> >
> > Best,
> > Paul Lam
> >
> > > 2020年12月8日 11:03,zilong xiao  写道:
> > >
> > > Hi Paul,
> > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> > >
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > > cause。。
> > >
> > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> > >
> > > Paul Lam  于2020年12月8日周二 上午10:45写道:
> > >
> > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> > >>
> > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> > >>
> > >> Best,
> > >> Paul Lam
> > >>
> > >>> 2020年12月7日 18:11,zilong xiao  写道:
> > >>>
> > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> > Dump发现有很多名为LeaseRenewer
> > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> > >>>
> > >>> Flink version: 1.11
> > >>> State backend:filesystem
> > >>> checkpoint interval: 60s
> > >>
> > >>
> >
> >
>


Re: 回复:Flink 1.11 SQL可以支持kafka动态分区发现么?

2021-01-20 文章 zhisheng
1.11 文档里面没有,那么估计就不支持了,可以看下 1.12 如何实现的,然后把这个 patch 打在内部自己的 flink 版本里面

sunfulin  于2021年1月20日周三 下午2:53写道:

>
> 我看下这个源码,貌似是有这个参数。还不确定SQL ddl里配置会不会生效,回头验证下。
>
>
>
>
> --
> 发自我的网易邮箱手机智能版
> 
>
>
> - Original Message -
> From: "Shuai Xia" 
> To: user-zh , sunfulin0321  >
> Sent: Wed, 20 Jan 2021 14:42:36 +0800
> Subject: 回复:Flink 1.11 SQL可以支持kafka动态分区发现么?
>
> Hi,看下FlinkKafkaConsumerBase内有没有使用,有的话就是支持的
>
>
> --
> 发件人:sunfulin 
> 发送时间:2021年1月20日(星期三) 14:40
> 收件人:user-zh 
> 主 题:Flink 1.11 SQL可以支持kafka动态分区发现么?
>
>
> hi,
>
> 各位大神,请教下,1.11的sql作业,如何能实现动态分区发现呐?我在1.12的文档里发现有个参数可以设置,但是1.11的版本里貌似没有。想确认下能否支持?
>
>
>
>
> --
> 发自我的网易邮箱手机智能版


Re: 设置状态存储位置后,job运行起来后找不到状态数据

2021-01-20 文章 zhisheng
你配置的是本地目录,不是 hdfs
目录,当重启后,可能新的任务运行的机器不是之前的那台机器了,那么之前作业的状态信息(在其他机器上)是不在新的机器上的,那么就会发现找不到状态文件,建议配置成
HDFS 的

Best
zhisheng

刘海  于2021年1月20日周三 下午9:05写道:

> Hi all
> 小弟遇到个问题期望大佬解答解答:
> 通过 env.setStateBackend(new
> RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,
>
>
> flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到
> “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?
>
>
> public class FlinkTestDemo {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(6);
> env.getConfig().setAutoWatermarkInterval(200);
> env.setStateBackend(new
> RocksDBStateBackend("file:///data/flink/checkpoints"));
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
> bsSettings);
>
> bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
> CheckpointConfig config = env.getCheckpointConfig();
>
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(5));
>
> Configuration configuration = bsTableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "6000");
> configuration.setString("table.exec.mini-batch.size", "5000");
>
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制


Re: flink1.12 on yarn per-job 运行问题

2021-01-20 文章 zhisheng
应该要提供一下 jm 的日志,麻烦检查一下 jm 里面的日志是否有异常的日志,我们遇到过类似的问题是因为包冲突导致的作业申请资源有问题,最后一直处于
created 状态

Best
zhisheng

花乞丐  于2021年1月21日周四 上午8:47写道:

> 贴一下提交程序的参数,以及你机器的配置,从上面看,是资源分配不够!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Sql Client读取Kafka报错:Could not find any factory for identifier 'kafka'

2021-01-11 文章 zhisheng
hello

你放 flink-sql-connector-kafka_2.11-1.11.3.jar  后有重启 sql client 和 集群吗?

Best
zhisheng

air23  于2021年1月11日周一 下午1:32写道:

> 下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-09 02:08:12,"inza9hi"  写道:
> >搜了下之前的邮件,貌似没有发现和我同样的问题。
> >
> >lib 下的Jar
> >flink-csv-1.11.3.jar
> >flink-table-blink_2.11-1.11.3.jar
> >flink-dist_2.11-1.11.3.jar
> >flink-table_2.11-1.11.3.jar
> >flink-jdbc_2.11-1.11.3.jar
>  log4j-1.2-api-2.12.1.jar
> >flink-json-1.11.3.jar  log4j-api-2.12.1.jar
> >flink-shaded-zookeeper-3.4.14.jar  log4j-core-2.12.1.jar
> >flink-sql-connector-elasticsearch6_2.11-1.11.3.jar
> >log4j-slf4j-impl-2.12.1.jar
> >flink-sql-connector-kafka_2.11-1.11.3.jar
> >mysql-connector-java-5.1.48.jar
> >
> >flink bin/sql-client.sh embedded
> >
> >CREATE TABLE user_behavior (
> >user_id BIGINT,
> >item_id BIGINT,
> >category_id BIGINT,
> >behavior STRING,
> >ts TIMESTAMP(3),
> >proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
> >WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> >在ts上定义watermark,ts成为事件时间列
> >
> >) WITH (
> >'connector' = 'kafka',  -- 使用 kafka connector
> >'topic' = 'data_test',  -- kafka topic
> >'startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> >'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
> >'format' = 'json'  -- 数据源格式为 json
> >);
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: crontab通过脚本启动flink-job失败,flink-sql-parquet_2.11-1.12.0.jar does not exist

2021-01-05 文章 zhisheng
hi

可以检查一下提交任务的 flink 客户端的 lib 目录下面是否有 flink-sql-parquet_2.11-1.12.0.jar 依赖

Best
zhisheng

冯嘉伟 <1425385...@qq.com> 于2021年1月4日周一 上午9:58写道:

> hi!
>
> java.io.FileNotFoundException: File file:/home/xjia/.flink/...
> 可以看出,从本地加载jar包,而不是hdfs。
>
> 我觉得可能是hadoop环境的问题,导致读取的scheme是file,使用 echo $HADOOP_CLASSPATH 检查你的环境。
>
> Important Make sure that the HADOOP_CLASSPATH environment variable is set
> up
> (it can be checked by running echo $HADOOP_CLASSPATH). If not, set it up
> using
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 zhisheng
hi

使用 -Dyarn.provided.lib.dirs 试试

Best
zhisheng

datayangl  于2020年12月22日周二 下午4:56写道:

>
>
> flink1.11 on yarn模式,我提前将flink
> lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
>
> 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test -yD
> yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
>
> 相关信息如下:
> 2020-12-22 08:41:11,157 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> [] - Dynamic Property set:
> yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> 2020-12-22 08:41:11,157 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> [] - Dynamic Property set:
> yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> -- class path: /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: object com.ly.third.udf.flink.SortKey not found.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: scala.ScalaReflectionException: object
> com.ly.third.udf.flink.SortKey not found.
> at
> scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> at
> scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> at
>
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> at com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 文章 zhisheng
hi,xintong

有对应的 Issue ID 吗?

Xintong Song  于2020年12月17日周四 下午4:48写道:

> 确实是 1.12.0 的 bug。
> 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:
>
> > 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> > true))之后会报错
> > :
> > Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> > allocate should not be 0. Please make sure that all types of managed
> memory
> > consumers contained in the job are configured with a non-negative weight
> > via
> > `taskmanager.memory.managed.consumer-weights`.
> >
> > 但查看源码这个参数是默认值。
> > 最终找到原因是
> > Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> > config缺少key : managedMemFraction.STATE_BACKEND
> > 当设置
> > config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> > 后,程序正常。
> > 代码如下
> > https://paste.ubuntu.com/p/9WrBz3Xrc6/
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 文章 zhisheng
sql client 也得重启

王敏超  于2020年12月9日周三 下午4:49写道:

> 在使用standalone模式,并启动sql
> cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar,
> 并且重启过集群。同样方式使用mysql cdc是可以的。
>
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'postgres-cdc' that implements
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
> classpath.
>
> Available factory identifiers are:
>
> blackhole
> jdbc
> kafka
> print
> ---
>
> 所以我是那里没配置对?
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink11 SQL 如何支持双引号字符串

2020-12-08 文章 zhisheng
是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537

赵一旦  于2020年12月9日周三 上午10:17写道:

> MARK,学习下。等回复。
>
> 莫失莫忘  于2020年12月8日周二 下午6:49写道:
>
> > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> > 字符串都是用双引号表示,例如select * from table1 where column1 =
> > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
> > ps:我看到flink SQL中字符串都必须用 单引号,例如select * from table1 where column1 =
> > 'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢
>


Re: 修改topic名称后从Savepoint重启会怎么消费Kafka

2020-12-02 文章 zhisheng
这个是正解,参考之前提的一个 Issue https://issues.apache.org/jira/browse/FLINK-16865

Best
zhisheng

Shuai Xia  于2020年12月2日周三 下午2:03写道:

>
> hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition
> 新的消费位置会置为EARLIEST_OFFSET
>
>
> if (restoredState != null) {
>for (KafkaTopicPartition partition : allPartitions) {
>   if (!restoredState.containsKey(partition)) {
>  restoredState.put(partition,
> KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
>   }
>}
>
>
>
>
> --
> 发件人:熊云昆 
> 发送时间:2020年12月1日(星期二) 22:57
> 收件人:user-zh ; Shuai Xia 
> 主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka
>
>
> 可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来
>
>
>
>
>
> 在 2020-12-01 20:59:48,"Shuai Xia"  写道:
> >
> >Hi,大佬们
> >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
> >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
> >可以手动控制么?
>
>
>


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 zhisheng
不需要,设置用户名和密码就行

Best
zhisheng

HunterXHunter <1356469...@qq.com> 于2020年12月1日周二 上午9:46写道:

> 你说的是es的 xpack 认证吗,需要你载入certificate文件是吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink-1.11.2 job启动不起来,

2020-11-30 文章 zhisheng
hi,正超

建议把作业的日志发一下?

Best
zhisheng

神奇哥哥 <759341...@qq.com> 于2020年12月1日周二 上午9:38写道:

> 你好,此问题我也遇到。目前已解决。
> 解决办法:
> 查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 zhisheng
1.12 支持了,参考
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/elasticsearch.html#username

Kyle Zhang  于2020年12月1日周二 上午9:35写道:

> Hi,你说的是这个问题么
>
> https://issues.apache.org/jira/browse/FLINK-16788
>
> On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com  wrote:
>
> > 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
> > 除了用api之外。
> >
> > 感谢!
> >
> >
> >
> > cljb...@163.com
> >
>


Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-18 文章 zhisheng
是不是有 kafka 机器挂了?

Best
zhisheng

hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:

> 感觉还有其它 root cause,可以看下还有其它日志不?
>
>
> Best,
> Hailong
>
> At 2020-11-18 15:52:57, "赵一旦"  wrote:
> >2020-11-18 16:51:37
> >org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> >Partition
> b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> >not found.
> >at org.apache.flink.runtime.io.network.partition.consumer.
> >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> >at org.apache.flink.runtime.io.network.partition.consumer.
>
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> >at org.apache.flink.runtime.io.network.partition.consumer.
> >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> >at org.apache.flink.runtime.io.network.partition.consumer.
>
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> >)
> >at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> >.java:670)
> >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> >CompletableFuture.java:646)
> >at java.util.concurrent.CompletableFuture$Completion.run(
> >CompletableFuture.java:456)
> >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> >ForkJoinExecutorConfigurator.scala:44)
> >at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> >.java:1339)
> >at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> >.java:107)
> >
> >
> >请问这是什么问题呢?
>


Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 zhisheng
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。

Best
zhisheng

huang botao  于2020年11月18日周三 下午10:34写道:

> 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
>
>
> On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18868816...@163.com> wrote:
>
> > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> >
> > 在 2020-11-18 15:29:54,"huang botao"  写道:
> > >Hi ,请教一个奇怪的问题:
> > >
> > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > >
> > >.assignTimestampsAndWatermarks(new
> > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > >
> > >.connect(ruleConfigSource)
> > >.process(new MetricDataFilterProcessFunction())
> > >.keyBy((KeySelector) metric -> {
> > >MetricDataKey metricDataKey = new MetricDataKey();
> > >metricDataKey.setDomain(metric.getDomain());
> > >metricDataKey.setStationAliasCode(metric.getStaId());
> > >metricDataKey.setEquipMK(metric.getEquipMK());
> > >metricDataKey.setEquipID(metric.getEquipID());
> > >metricDataKey.setMetric(metric.getMetric());
> > >return metricDataKey;
> > >})
> > >
> > >.window(SlidingEventTimeWindows.of(Time.seconds(2),
> Time.seconds(1)))
> > >.apply(new RichWindowFunction > >MetricDataKey, TimeWindow>() {
> > >@Override
> > >public void apply(MetricDataKey tuple, TimeWindow window,
> > >Iterable input, Collector out) throws
> > >Exception {
> > >input.forEach(x->{
> > >System.out.println("--->>>"+x);
> > >});
> > >}
> > >})
> > >
> > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > >
> > >
> > >数据一直在消费着,没有任何报错信息
> >
>


Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-13 文章 zhisheng
看完还是没有解决方案啊

JasonLee <17610775...@163.com> 于2020年11月13日周五 下午4:10写道:

> hi
> 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-12 文章 zhisheng
hello, Yang Wang

这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?

Best,
zhisheng

Yang Wang  于2020年9月3日周四 上午11:15写道:

> 目前HA模式下,application模式还不能支持多job,所以就固定是0了
> 主要的原因是recover的问题还没有解决好
>
>
> Best,
> Yang
>
> chenkaibit  于2020年9月2日周三 下午7:29写道:
>
> > hi:
> >  我在测试 flink-1.11 application 模式时发现 开启 HA 后 jobID 总是
> > ;关闭 HA 后是个随机字符(和之前版本相同)。这个是个 bug
> 还是就是这么设计的?
> >  求大神解答。
> >
> > --
> >
> > Best, yuchuan
>


Re: ElasticsearchApiCallBridge相关类构造函数问题

2020-11-12 文章 zhisheng
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码

Luna Wong  于2020年11月11日周三 下午9:16写道:

> 为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
> 我还想继承Elasticsearch6ApiCallBridge类。在new
> RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。
>
> 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?
>


Re: Re: slot数量与并行度的大小关系

2020-11-12 文章 zhisheng
可以参考 http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/  文章理解一下

hl9...@126.com  于2020年11月12日周四 下午4:47写道:

> 是flink standalone 集群。
> job并行度是在job的java代码中通过  streamExecutionEnvironment.setParallelism(15) 来指定的。
>
>
>
> hl9...@126.com
>
> 发件人: Xintong Song
> 发送时间: 2020-11-12 13:18
> 收件人: user-zh
> 主题: Re: slot数量与并行度的大小关系
> 你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?
>
> 流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3]
> 中的几种方式更改作业的并行度。另外,也可以通过配置 `taskmanager.numberOfTaskSlots` 来增加 flink 集群的
> slot 数量。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Nov 11, 2020 at 7:54 PM Shawn Huang  wrote:
>
> > Hi,
> >
> > Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。
> > 如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。
> >
> > 目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/concepts/flink-architecture.html#task-slots-and-resources
> > [3]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html
> >
> > Best,
> > Shawn Huang
> >
> >
> > hl9...@126.com  于2020年11月11日周三 下午2:58写道:
> >
> > > Hi,all:
> > > 我在flink
> > > web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
> > > Caused by:
> > >
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > > Could not allocate the required slot within slot request timeout.
> > > Please make sure that the cluster has enough resources.
> > >
> > > 是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?
> > >
> > >
> > >
> > > hl9...@126.com
> > >
> >
>


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 zhisheng
同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态

hdxg1101300123  于2020年11月12日周四 下午8:07写道:

> 可以设置检查点失败任务也失败
>
>
>
> 发自vivo智能手机
> > hi everyone,
> >
> > 最近在使用Flink-1.11.1 On Yarn Per
> Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> application仍处于运行状态
> >
> > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> >
> > best,
> > amenhub


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 zhisheng
hi

可以看看 Timer 的机制,能不能解决你的问题

Best zhisheng

hailongwang <18868816...@163.com> 于2020年11月12日周四 下午5:25写道:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang  于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00 msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>


Re: Re: sql-cli执行sql报错

2020-11-05 文章 zhisheng
这个问题同样在最新的 master 分支也有这个问题,我建了一个 Issue 描述了下整个流程
https://issues.apache.org/jira/browse/FLINK-19995

hl9...@126.com  于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>


Re: 使用flink-CDC checkpoint超时问题

2020-11-02 文章 zhisheng
hi

我建议可以从两方面排查一下:

1、检查 checkpoint 的大小,是不是很大?

2、检查作业是否反压?反压的情况下 checkpoint 一般很难成功,这种情况可以先解决反压的问题。

Best
zhisheng

丁浩浩 <18579099...@163.com> 于2020年11月2日周一 下午4:08写道:

> 我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。
> 前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。
> 每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问
> 数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?
>
>


Re: 回复: flink任务挂掉后自动重启

2020-11-02 文章 zhisheng
平台层是将功能自动化,产品化,没有平台总可以手动按照这个思路去实现。

bradyMk  于2020年11月2日周一 下午2:27写道:

> 那我们没有这样的计算平台该怎么办呢?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 回复: flink任务挂掉后自动重启

2020-11-01 文章 zhisheng
平台层的意思就是说:假设你们有实时计算平台,那么可以考虑把这个想法做在你们的平台里面。

bradyMk  于2020年11月2日周一 上午11:40写道:

> zhisheng大佬好~我不是很理解您说的平台层具体是什么意思,指的是什么。。。
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 elasticsearch connector

2020-11-01 文章 zhisheng
hi,

可以自己根据社区的代码进行重编译,改成自己公司的依赖名,推送自公司的 nexus。

Best
zhisheng

Yangze Guo  于2020年10月29日周四 下午4:00写道:

> 1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-18361
>
> Best,
> Yangze Guo
>
> On Thu, Oct 29, 2020 at 3:37 PM 赵帅  wrote:
> >
> > elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql
> api如何加入账号认证?
>


Re: flink1.11 kafka connector

2020-11-01 文章 zhisheng
hi,

应该是可以继承 FlinkKafkaPartitioner 接口,自己重写 partition 方法实现 hash(key) 的功能

eg:

public class MyCustomPartitioner extends FlinkKafkaPartitioner> {

@Override
public int partition(Map map, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
String key = map.get(xxx).toString();
return partitions[Math.abs(key.hashCode() % partitions.length)];
}
}

Best!
zhisheng

Jark Wu  于2020年10月29日周四 下午2:33写道:

> 多谢创建 issue。
>
> side comment: 1.12 中 kafka connector 将支持声明 message key 部分,当声明了 message key
> 部分,就自动会按照 key 来做 hash 到某个固定分区。
>
> Best,
> Jark
>
> On Thu, 29 Oct 2020 at 14:27, Dream-底限  wrote:
>
> > hi、
> > 好的,https://issues.apache.org/jira/browse/FLINK-19871
> >
> > Jark Wu  于2020年10月29日周四 下午12:06写道:
> >
> > > 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
> > >
> > > > hi、
> > > > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > > > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> > > >
> > > >- fixed:每个Flink分区最多只能有一个Kafka分区。
> > > >- round-robin:Flink分区循环分配给Kafka分区。
> > > >
> > >
> >
>


Re: 回复: flink任务挂掉后自动重启

2020-11-01 文章 zhisheng
hi,

提供一个方案,平台层可以做到作业自动拉起,那么需要平台层有这些信息。

1、作业启动的时候保存一下作业的 jobid 信息

2、平台轮训检测作业的状态,如果作业挂了,直接从配置的 checkpoint 基础路径 + jobid 目录下去找最新的目录(里面含
_metadata)

eg: hdfs:/flink/checkpoints/9b4cddb385b0c5db96b0774769867673/chk-15109

然后平台层将作业从 checkpoint 拉起,当然这个是否自动拉起,拉起的次数可以让用户去选择。

Best!
zhisheng

bradyMk  于2020年10月31日周六 下午4:20写道:

> 好的,我去试试这种方法,感谢~
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 官方后续会有支持kafka lag metric的计划吗

2020-10-28 文章 zhisheng
hi, silence

对于你提到的第一种方案,我觉得在 flink 里面是做不到的,因为 flink 只可以拿得到消费数据的 offset 信息,但是拿不到 kafka
里面该 topic 具体分区最新的 offset 值,那么也就无法相减得到每个分区的 lag,从而无法获取整个 topic 的 lag。

对于第二种方案,我觉得是可行的,可以在自己作业里面埋点(当前系统时间与消费到的数据的事件时间的差值),然后每个并行度分别上报,最后监控页面可以看到作业分区延迟最大是多长时间。

Best!
zhisheng

silence  于2020年10月28日周三 下午7:55写道:

> 目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况
> 主要是两种情况:
> 1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量
> 2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时
>
> kafka lag的监控对实时任务的稳定运行有着非常重要的作用,
> 网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于并行下watermark不生成

2020-10-28 文章 zhisheng
hi,Benchen

可以考虑在 source 算子后面加一个 rebalance()

Best!
zhisheng

Shubin Ruan  于2020年10月28日周三 下午7:36写道:

> 可以考虑在数据源处进行处理:
>
>
> 设置个时间阈值,若检测到某个 key 下的数据超过时间阈值还未更新,则根据系统的 processing time 按照某种逻辑生成1条水印发送到下游。
> 在 2020-10-28 18:54:22,"BenChen"  写道:
> >Hi
> all,在Flink1.11里面新增了WatermarkStetagy来处理某个并行度下没有数据导致watermark不触发的问题,在1.10里面Flink有什么机制解决这个问题吗?谢谢
> >
> >
> >| |
> >BenChen
> >|
> >|
> >haibin...@163.com
> >|
> >签名由网易邮箱大师定制
> >
>


Re: Flink是否可以动态调整任务并行度

2020-10-28 文章 zhisheng
应该不支持

ZT.Ren <18668118...@163.com> 于2020年10月28日周三 下午3:53写道:

> 印象中,Flink1.9之后的某个版本支持动态调整并行度,但忘记怎么使用了。有没有哪位同学帮忙指点下,多谢


Re: flink1.11日志上报

2020-10-27 文章 zhisheng
弱弱的问一下,你们集群作业数量大概多少?因为用户可能打印原始数据在日志里面,这个数据量确实还是很大的,全部将日志打到 ES 每月需要多少成本啊?

Storm☀️  于2020年10月27日周二 下午8:37写道:

> 我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pyflink和flink版本的兼容性问题

2020-10-22 文章 zhisheng
估计可能会有问题,很多变动

whh_960101  于2020年10月23日周五 上午11:41写道:

> Hi,各位大佬,
>  想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink
> 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12
> 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗


Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 zhisheng
hi

既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说

> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)

建议看看是不是这个转发有问题,只转发了一个节点

Best
zhisheng

Lynn Chen  于2020年10月23日周五 上午11:01写道:

>
>
>
> hi, zhisheng:
>
>
> 我解析 json 后:
> (xxx, xxx, xxx, topic, partition, offset)
> =>
>
>
> (false,1603420582310,"INSERT","test3.order",2,75)
> (false,1603421312803,"INSERT","test3.order",2,76)
> (false,1603421344819,"INSERT","test3.order",2,77)
> (false,1603421344819,"INSERT","test3.order",2,78)
>
>
> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>
>
> 我的猜想:
>
>
> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>
> broker1 配置:
>
>
> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> broker2 配置:
>
>
> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
>
>
>
>
>
> broker3 配置:
>
>
> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> security.inter.broker.protocol=PLAINTEXT
>
>
> 本机连接kafka:
> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>
>
> 是跟这个配置有关吗?
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-23 08:37:14,"zhisheng"  写道:
> >hi
> >
> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
> >
> >eg:
> >
> >  env.addSource(new FlinkKafkaConsumer011<>(
> >parameters.get("topic"),new
> >JSONKeyValueDeserializationSchema(true),
> >buildKafkaProps(parameters))).flatMap(new
> >FlatMapFunction() {
> >@Overridepublic void flatMap(ObjectNode jsonNodes,
> >Collector collector) throws Exception {
> >System.out.println(jsonNodes.get("value"));
> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
> >
> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
> >   collector.collect(jsonNodes);
> >}}).print();
> >
> >Best
> >
> >zhisheng
> >
> >
> >Lynn Chen  于2020年10月23日周五 上午12:13写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >> hi,  Qijun Feng:
> >>
> >>
> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-04-03 09:27:52,"LakeShen"  写道:
> >> >Hi Qijun,
> >> >
> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
> >> >
> >> >Best,
> >> >LakeShen
> >> >
> >> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
> >> >
> >> >> Dear All,
> >> >>
> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >> >>  现在改成了所有地址,也换了 group.id
> >> >>
> >> >>
> >> >> Properties properties = new Properties();
> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >> >>
> >> >> FlinkKafkaConsumer010 kafkaConsumer010 =
> >> >>new FlinkKafkaConsumer010("behavior-logs_dev",
> new
> >> >> BehaviorLogDeserializationSchema(), properties);
> >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
> >> >>
> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
> partiton=1,或者
> >> 2
> >> >> 的,
> >> >>
> >> >> 2020-04-02 14:54:58,532 INFO
> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> >> Consumer subtask 0 creating fetcher with offsets
> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >> >>
> >> >>
> >> >> 是哪里有问题吗?
> >> >>
> >> >>
> >>
>


Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 文章 zhisheng
hi

如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。

eg:

  env.addSource(new FlinkKafkaConsumer011<>(
parameters.get("topic"),new
JSONKeyValueDeserializationSchema(true),
buildKafkaProps(parameters))).flatMap(new
FlatMapFunction() {
@Overridepublic void flatMap(ObjectNode jsonNodes,
Collector collector) throws Exception {
System.out.println(jsonNodes.get("value"));
System.out.println(jsonNodes.get("metadata").get("topic").asText());

System.out.println(jsonNodes.get("metadata").get("offset").asText());

System.out.println(jsonNodes.get("metadata").get("partition").asText());
   collector.collect(jsonNodes);
}}).print();

Best

zhisheng


Lynn Chen  于2020年10月23日周五 上午12:13写道:

>
>
>
>
>
>
> hi,  Qijun Feng:
>
>
> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-03 09:27:52,"LakeShen"  写道:
> >Hi Qijun,
> >
> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
> >
> >Best,
> >LakeShen
> >
> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
> >
> >> Dear All,
> >>
> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >>  现在改成了所有地址,也换了 group.id
> >>
> >>
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >>
> >> FlinkKafkaConsumer010 kafkaConsumer010 =
> >>new FlinkKafkaConsumer010("behavior-logs_dev", new
> >> BehaviorLogDeserializationSchema(), properties);
> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
> >>
> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
> 2
> >> 的,
> >>
> >> 2020-04-02 14:54:58,532 INFO
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> Consumer subtask 0 creating fetcher with offsets
> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >>
> >>
> >> 是哪里有问题吗?
> >>
> >>
>


Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 zhisheng
hi

flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持

 [1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs

Best
zhisheng

Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:

>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 文章 zhisheng
Hi Robin:

1、是不是更改了刷新时间?一直不显示吗?

2、running 的作业不会显示的,你可以之间在 yarn 查看,history server 应该是只提供展示挂掉的作业

PS:另外提几个 history server 的问题

1、挂掉的作业展示能否支持分页呢?目前直接在一个页面全部展示了历史所有的作业,打开会很卡

2、有办法可以查看挂掉作业的 jm 和 tm 日志吗?因为 HDFS
其实是有日志,按道理是可以拿到日志信息然后解析展示出来的,Spark history server 也是可以查看挂掉作业的日志


Best!
zhisheng

Robin Zhang  于2020年10月22日周四 下午6:11写道:

>
> 如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
> api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
> 使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
> job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
>
> ,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t447/%E5%8E%86%E5%8F%B2%E6%9C%8D%E5%8A%A1%E5%99%A8.png>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-14 文章 zhisheng
想获取到的话其实可以通过 REST API 去如下图的 metrics 处获取作业 source 往下 send
的数据量和速度,不过这个是单个并行度的,可以去将每个并行度的累加起来。

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-10-14-060508.png

[image: image.png]

Best

zhisheng

Kevin Liu  于2020年10月14日周三 上午12:35写道:

> 可以参考 https://blog.csdn.net/weixin_41608066/article/details/108557869
> 。“目前flink
> sql是不支持source/sink并行度配置的,flink sql中各算子并行度默认是根据source的partition数或文件数来决定。”
> 如果想实现在设置 source 时指定并行度,可以参考该文章对源码做些修改。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:HistoryServer完成任务丢失的问题

2020-10-13 文章 zhisheng
hi,我使用 1.10 测试过,发现 history server 查到 cancel job 的时间比较长(超过默认的
10s),但是最终还是会出现的。

如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-10-14-033612.png

[image: image.png]

刘建刚  于2020年9月28日周一 下午4:13写道:

> 修复方案为:https://issues.apache.org/jira/browse/FLINK-18959
>
> xiao cai  于2020年9月27日周日 下午6:42写道:
>
> > 貌似是个bug,我的版本是1.11.0
> >
> >
> >
> >
> https://issues.apache.org/jira/browse/FLINK-18959?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20text%20~%20%22history%20server%22
> >
> >
> >  原始邮件
> > 发件人: xiao cai
> > 收件人: user-zh
> > 发送时间: 2020年9月27日(周日) 18:41
> > 主题: Re:Re:HistoryServer完成任务丢失的问题
> >
> >
> > 貌似是个bug
> >
> >
> >  原始邮件
> > 发件人: xiao cai
> > 收件人: user-zh
> > 发送时间: 2020年9月27日(周日) 18:31
> > 主题: Re:Re:HistoryServer完成任务丢失的问题
> >
> >
> > 是在history server中没有,但是yarn
> > logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history
> > server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael
> > Ran 收件人: user-zh 发送时间:
> > 2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题
> 你的意思是,日志彻底消失了?完全找不到?
> > 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao
> > cai"  写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。
> > >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael Ran<
> > greemqq...@163.com> >收件人: user-zh >发送时间:
> > 2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history
> > 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai"  写道:
> > >Hi: >flink 1.11.0
> >
> >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history
> > server中却找不到这个任务。同时我尝试了再yarn中kill
> > application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history
> > server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.
>


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 zhisheng
如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业

Jeff Zhang  于2020年7月11日周六 下午11:23写道:

> Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
> api来做到的,对zeppelin感兴趣的话,可以参考这个视频
>
> https://www.bilibili.com/video/BV1Te411W73b?p=21
>
>
> jianxu  于2020年7月11日周六 下午4:52写道:
>
> > Hi:
> >
> >
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
> > jobId)取消流任务。
> > Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考
> > https://github.com/todd5167/flink-spark-submiter
> > 项目的任务提交部分,取消任务时构建ClusterClient即可。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > | |
> > jianxu
> > |
> > |
> > rjia...@163.com
> > |
> >
> >
> >
> >
> > 在2020年07月11日 16:19,Congxian Qiu 写道:
> > Hi
> >
> > 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
> > 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道 applicationId,另外你还需要知道
> > flink 的 JobId,接下来就是调用 Flink 的接口了
> >
> > 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
> >
> > Best,
> > Congxian
> >
> >
> > godfrey he  于2020年7月9日周四 上午10:08写道:
> >
> > 可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
> > JobClient 可以 cancel 作业,获取 job status。
> >
> > [1]
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >
> > Best,
> > Godfrey
> >
> > Evan  于2020年7月9日周四 上午9:40写道:
> >
> > 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> > API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 zhisheng
有没有窗口啊?

Robin Zhang  于2020年7月14日周二 上午11:48写道:

> 
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: flink on yarn日志问题

2020-07-13 文章 zhisheng
知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager 的日志(可以拼出路径),然后复制到本地去查看

Yangze Guo  于2020年7月14日周二 上午11:58写道:

> Hi, 王松
>
> 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
>
> Best,
> Yangze Guo
>
> On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
> >
> > 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
> >
> > Yangze Guo  于2020年7月13日周一 下午5:03写道:
> >
> > > 1.
> > >
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> > > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> > > >
> > > > 不好意思  怪我灭有描述清楚
> > > > 1 目前开启日志收集功能
> > > > 2 目前已是 per-job模式
> > > > 3 集群使用cdh flink.1.10
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > > > >Hi,
> > > > >
> > > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > > > >
> > > > >第二个问题,您可以尝试一下per-job mode [2][3]
> > > > >
> > > > >[1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > > > >[2]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > > > >[3]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > > >
> > > > >
> > > > >Best,
> > > > >Yangze Guo
> > > > >
> > > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > > > >>
> > > > >> 请问一下两个问题
> > > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> > > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> > > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > > > >>
> > >
>


Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 zhisheng
hi,maqi

有完整的日志吗?在这个异常之前还有其他的异常信息吗?如果有,可以提供一下!

Best,
zhisheng

m...@sinoiov.com  于2020年7月9日周四 下午7:57写道:

>
> 请教各位:
> flink任务在本机写入测试环境kafka集群没问题,
>
> 但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka
>
> 异常信息如下:
>
> 2020-07-09 19:17:33,126 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from
> DEPLOYING to RUNNING.
> 2020-07-09 19:17:33,164 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from
> DEPLOYING to RUNNING.
> 2020-07-09 19:17:39,049 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- async wait
> operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched
> from RUNNING to FAILE
> D.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka:
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
> at
> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
>
>
>
>
>
>
>


Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-05 文章 zhisheng
生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian

Best!
zhisheng

Congxian Qiu  于2020年7月4日周六 下午3:21写道:

> @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>
> Best,
> Congxian
>
>
> zhisheng  于2020年7月4日周六 下午12:27写道:
>
> > 我们也有遇到过这个异常,但是不是很常见
> >
> > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >
> > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> > > Best,
> > > Congxian
> > >
> > >
> > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > > > >
> > > > >
> > > > >| |
> > > > >JasonLee
> > > > >|
> > > > >|
> > > > >邮箱:17610775...@163.com
> > > > >|
> > > > >
> > > > >Signature is customized by Netease Mail Master
> > > > >
> > > > >在2020年07月01日 20:43,程龙 写道:
> > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > > > >
> > > > >
> > > > >java.lang.Exception: Could not perform checkpoint 3201 for operator
> > > > Filter -> Map (2/8).
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > > > >   at org.apache.flink.streaming.runtime.io
> > > >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > > > >   at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > > > >   at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > > > >   at java.lang.Thread.run(Thread.java:745)
> > > > >Caused by: java.lang.NullPointerException
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > > > >   at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> > > >
> > >
> >
>


Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-03 文章 zhisheng
我猜你是想要将 table name 作为一个标签方便后期分组查询过滤?

wangl...@geekplus.com.cn  于2020年7月3日周五 上午10:24写道:

> public void invoke(ObjectNode node, Context context) throws Exception {
>
> String tableName = node.get("metadata").get("topic").asText();
> Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
> new MeterView(10));
> meter.markEvent();
> log.info("### counter: " + meter.toString() + "\t" +
> meter.getCount());
>
> 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics.
> 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: kcz
> Send Time: 2020-07-03 09:13
> Receiver: wanglei2
> Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
> 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧
>
>
>
> -- 原始邮件 --
> 发件人: 王磊2 
> 发送时间: 2020年7月2日 21:46
> 收件人: user-zh , 17610775726 <17610775...@163.com>
> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>
>
> 没有明白你说的实现方式。
>
> 我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ...,
> myCounter_tableX
> 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。
>
> 谢谢,
> 王磊
>
>
>
> --
> 发件人:JasonLee <17610775...@163.com>
> 发送时间:2020年7月2日(星期四) 21:12
> 收件人:user-zh 
> 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
>
> 你把tablename传到下面metric里不就行了吗
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:
>
> 全都是同一种类型的 metrics.
> 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的
> metrics(但都是 meter 类型)
>
> 谢谢,
> 王磊
>
>
>
>
> wangl...@geekplus.com.cn
>
>
> 发件人: JasonLee
> 发送时间: 2020-07-02 16:16
> 收件人: user-zh
> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
> 是要生成不同类型的metric吗 比如counter meter ?
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:
>
> 官网上的例子:
>
> public class MyMapper extends RichMapFunction {
> private transient Counter counter;
> @Override
> public void open(Configuration config) {
>   this.counter = getRuntimeContext()
> .getMetricGroup()
> .counter("myCounter");
> }
> @Override
> public String map(String value) throws Exception {
>   this.counter.inc();
>   return value;
> }
> }
>
> 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>


Re: Flink job不定期就会重启,版本是1.9

2020-07-03 文章 zhisheng
我们集群一般出现这种异常大都是因为 Full GC 次数比较多,然后最后伴随着就是 TaskManager 挂掉的异常

Xintong Song  于2020年7月3日周五 上午11:06写道:

> 从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。
> 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jul 3, 2020 at 10:48 AM noon cjihg  wrote:
>
> > Hi,大佬们
> >
> > Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?
> >
> > 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
> > 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
> > 2020-07-01 20:20:43.875 [flink-metrics-16] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
> > down.
> > 2020-07-01 20:20:43.875 [flink-metrics-16] INFO
> > akka.remote.RemoteActorRefProvider$RemotingTerminator
> > flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
> > down.
> > 2020-07-01 20:20:43.891 [flink-metrics-16] INFO
> > org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped Akka RPC
> > service.
> > 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Terminating
> > cluster entrypoint process YarnJobClusterEntrypoint with exit code 2.
> > java.util.concurrent.CompletionException:
> > akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/resourcemanager#-781959047]] after [1
> > ms]. Message of type
> > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> > reason for `AskTimeoutException` is that the recipient actor didn't
> > send a reply.
> > at
> >
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > at
> >
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > at
> >
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> > at
> >
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> > at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
> > at akka.dispatch.OnComplete.internal(Future.scala:263)
> > at akka.dispatch.OnComplete.internal(Future.scala:261)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> > at
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > at
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> > at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/resourcemanager#-781959047]] after [1
> > ms]. Message of type
> > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> > reason for `AskTimeoutException` is that the recipient actor didn't
> > send a reply.
> > at
> > akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> > at
> > akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> > ... 9 common frames omitted
> >
>


Re: 如何在窗口关闭的时候清除状态

2020-07-03 文章 zhisheng
你试试在 clear 方法中清理

18579099...@163.com <18579099...@163.com> 于2020年7月3日周五 下午2:02写道:

>
> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢?
>
> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。
>
> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢?
>
>
>
> 18579099...@163.com
>


Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-03 文章 zhisheng
我们也有遇到过这个异常,但是不是很常见

Congxian Qiu  于2020年7月3日周五 下午2:08写道:

> 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> [1]  https://issues.apache.org/jira/browse/FLINK-17479
> Best,
> Congxian
>
>
> 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
>
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >
> >
> >
> >
> >
> > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> > >你到具体的tm上找到相关的operator看看是不是有异常信息
> > >
> > >
> > >| |
> > >JasonLee
> > >|
> > >|
> > >邮箱:17610775...@163.com
> > >|
> > >
> > >Signature is customized by Netease Mail Master
> > >
> > >在2020年07月01日 20:43,程龙 写道:
> > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> > >
> > >
> > >java.lang.Exception: Could not perform checkpoint 3201 for operator
> > Filter -> Map (2/8).
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> > >   at org.apache.flink.streaming.runtime.io
> >
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> > >   at org.apache.flink.streaming.runtime.io
> >
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> > >   at org.apache.flink.streaming.runtime.io
> > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> > >   at org.apache.flink.streaming.runtime.io
> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> > >   at org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> > >   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> > >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> > >   at java.lang.Thread.run(Thread.java:745)
> > >Caused by: java.lang.NullPointerException
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> > >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)
> >
>


Re: flink1.9读取阿里Mq问题

2020-07-03 文章 zhisheng
hi,guanyq

社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。

Best!
zhisheng

guanyq  于2020年7月3日周五 下午11:44写道:

> flink1.9读取阿里RocketMQ
> 如何设置AccessKey,SecretKey 参数
>
>
>
> finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)build();


Re: flinksql流计算任务非正常结束

2020-06-29 文章 zhisheng
是不是作业是一个批作业呀?

Yichao Yang <1048262...@qq.com> 于2020年6月29日周一 下午6:58写道:

> Hi
>
>
> 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"MuChen"<9329...@qq.com;
> 发送时间:2020年6月29日(星期一) 下午4:53
> 收件人:"user-zh"
> 主题:flinksql流计算任务非正常结束
>
>
>
> hi,大家好:
>
> 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm
> fsql-clinbsp; 2gt;amp;1 amp;
>
> 然后通过sql-client,提交了一个sql:
>
> 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。nbsp;
>
> 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
> https://s1.ax1x.com/2020/06/29/Nf2dIA.png
>
> 日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
> 2020-06-29 14:53:20,260 INFO
> org.apache.flink.api.common.io.LocatableInputSplitAssigner
> - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
> 14:53:22,845 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
> PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
> 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29
> 15:34:52,982 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
> - Shutting YarnSessionClusterEntrypoint down with application status
> SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - Removing cache directory
> /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29
> 15:34:53,073 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
> 15:34:53,074 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - Shut down complete. 2020-06-29 15:34:53,074 INFO
> org.apache.flink.yarn.YarnResourceManager
> - Shut down cluster because application is in SUCCEEDED, diagnostics null.
> 2020-06-29 15:34:53,076 INFO
> org.apache.flink.yarn.YarnResourceManager
> - Unregister application from the YARN Resource Manager with final status
> SUCCEEDED. 2020-06-29 15:34:53,088 INFO
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
> - Waiting for application to be successfully unregistered. 2020-06-29
> 15:34:53,306 INFO
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
> - Closing components. 2020-06-29 15:34:53,308 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
> INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> - Stopping dispatcher 
> akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
> 2020-06-29 15:34:53,310 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> - Stopping all currently running jobs of dispatcher
> akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29
> 15:34:53,311 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> - Stopping the JobMaster for job default: insert into
> rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl -
> Interrupted while waiting for queue
> java.lang.InterruptedException
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
> at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
> 2020-06-29 15:34:53,324 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy
> - Opening proxy : uhadoop-op3raf-core12:2
>
> 
> ps:nbsp;
>
> 1. kafka中一直有数据在写入的
> 2. flink版本1.10.0
> 请问,任务状态为什么会变为SUCCEEDED呢?
>
> 谢谢大家!
>
>
>
>
> 逻辑稍微有些复杂,可以忽略下面的sql代码:
> # -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 --
> 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test
> select begin_time, vid, vid_group, max(dv),
> max(click), max(vv), max(effectivevv) from(
> select t1.begin_time begin_time, t1.u_vid
> vid, t1.u_vid_group vid_group, dv,
> click, vv, if(effectivevv is null,0,effectivevv)
> effectivevv from ( -- dv、click、vv
> select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE)
> AS STRING) begin_time, cast(u_vid as bigint)
> u_vid, u_vid_group,
> sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and
> u_c_module='M011',1,0)) dv,
> sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and
> u_c_module='M011',1,0)) click,
> sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0))
> vv FROM rt_ods.ods_applog_vidsplit where u_vid is
> not null and trim(u_vid) not null and trim(u_vid_group) not in ('','-1') and
> ( 

Re: flink读取kafka超时问题

2020-06-29 文章 zhisheng
hi,阿华田

你可以检查一下作业重启的时候,对应 Kafka 集群的 broker
机器上面的监控信息,看看对应时间是否有负载变高的情况,从而验证一下是否因为机器负载变高导致的读取超时?

Best!
zhisheng

Yichao Yang <1048262...@qq.com> 于2020年6月29日周一 下午7:50写道:

> Hi
>
>
> 看报错是说 dercd_seeme-3 partition 读取异常,可以检查下上游kafka的该partition是否有异常。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"阿华田" 发送时间:2020年6月29日(星期一) 上午10:36
> 收件人:"user-zh"
> 主题:flink读取kafka超时问题
>
>
>
> Caused by: java.lang.Exception:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
> before the position for partition dercd_seeme-3 could be determined
> 大佬们flink读取kafka遇到过这个错误没?现在情况是
> 每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker造成了很大的负载导致请求超时?
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制


Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 文章 zhisheng
可以测试一下

Tianwang Li  于2020年6月29日周一 下午8:13写道:

> >
> > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> >
> 用Row 和 Tuple 性能上会有差别吗?
>
> Jark Wu  于2020年6月19日周五 下午3:47写道:
>
> > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> >
> >
> > On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
> >
> > > 感谢你的回答,请问可否举一个参照例子?
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > > wangweigu...@stevegame.cn> 写道:
> > > >
> > > >   多个值组合在一起,当一个复合值使用!
> > > >
> > > >
> > > >
> > > >
> > > >发件人: 魏旭斌
> > > >发送时间: 2020-06-19 15:01
> > > >收件人: user-zh
> > > >主题: 关于拓展 Tuple元组的问题
> > > >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > > 请问有什么解决的方案? 谢谢
> > >
> >
>
>
> --
> **
>  tivanli
> **
>


Re: Re: flink 高可用问题

2020-06-29 文章 zhisheng
hi,Tony

你可以把 Checkpoint 间隔时间稍微设置大一些,看起来像是作业启动的时候 Task 还没 Running,就开始执行 Checkpoint
了,而 Checkpoint 是要求所有的 Task 是处于 Running 状态的,所以就会丢弃掉那次
Checkpoint,BT,就算有这个异常应该问题也不大,只要后面你的作业全启动成功了的话,则 Checkpoint 还是会成功的。

Best!

zhisheng

Tony  于2020年6月29日周一 下午8:16写道:

>
> 你好,我的flink运行环境是在k8s中,我先是打开了checkpoint功能,那样是可以用的,task失败了数据还可以恢复,但job失败了就不行了,所以我又配置flink的高可用,在job的yaml文件里设置了动态属性("-Dhigh-availability=zookeeper"),这样job启动时就出现那种警告,功能也不好用了。但如果配置在flink-config文件里的话就可以,不知道为什么?而我就是想用那个动态属性的方式,谢谢大神指点。
>
>
>
>
>
> --
> 发自我的网易邮箱手机智能版
> 
>
>
> - Original Message -
> From: tison 
> To: user-zh 
> Sent: Mon, 22 Jun 2020 15:08:04 +0800
> Subject: Re: flink 高可用问题
>
> 你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk
> 间隔又小,就这样了。
>
> 如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来
>
> Best,
> tison.
>
>
> Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:
>
> > Hi
> >
> >
> > 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"Tony" > 发送时间:2020年6月22日(星期一) 上午10:54
> > 收件人:"user-zh" >
> > 主题:flink 高可用问题
> >
> >
> >
> > 你好。
> >
> >
> > 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> > high-availability:zookeeper
> > high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> > high-availability.zookeeper.path.root:/flink
> > high-availability.cluster-id:/cluster_one
> > highavailability.storageDir:hdfs://master:9000/flink/ha
> >
> >
> > 我的flink和zookeeper都是在K8s的容器中
> > job启动出现如下问题:麻烦帮忙看一下,谢谢。
> > 2020-06-22 02:47:43,884 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print to
> > Std. Out, Filter -Query Map - Unwind - Custom Map -
> filter
> > - Data Transformation - Filter) (1/1) of job
> >  is not in state RUNNING but SCHEDULED
> > instead. Aborting checkpoint.
>


Re: flink batch on yarn任务容错

2020-06-29 文章 zhisheng
hi,张波,

使用 Checkpoint 的方式在遇到错误的时候会 failover,恢复的时候是从上一次完整 Checkpoint
的状态开始恢复,不会让你重新从最开始的数据开始读取计算。

Best !

zhisheng

张波 <173603...@qq.com> 于2020年6月29日周一 下午10:06写道:

> 场景如下:
> flink批处理中,如果出现错误,包括网络及其他原因,导致任务失败,此时会将整个任务重新跑一遍,就算只是其中一个tm出现了问题也是如此。
> 我有一个sink
> es的操作,由于数据量大,将其分拆成一个独立的batch任务,但是只要中间有导致tm挂掉的错误(非任务本身逻辑问题),任务就会从头执行,感觉非常不友好。
> 问题:是否可以用streamsink的方式,使用checkpoint来解决批处理整个重启的问题?或者在10甚至之后的版本有新的解决方式?


Re: 【Flink的transformations】

2020-06-29 文章 zhisheng
应该看名字就可以看出来对应关系的

忝忝向仧 <153488...@qq.com> 于2020年6月29日周一 下午10:29写道:

> Hi,all:
>
>
>
> 请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类.
>
>
> 谢谢.


Re: flink1.9 on yarn

2020-06-27 文章 zhisheng
hi,guanyq

你这种提交方式属于 Flink On YARN 的 per job 模式,机制是这样的,当新提一个作业的时候,AppID 是会变化的。

Best!
zhisheng

Yangze Guo  于2020年6月28日周日 上午9:59写道:

> 我理解你需要使用session模式,即./bin/yarn-session.sh [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
>
> Best,
> Yangze Guo
>
> On Sun, Jun 28, 2020 at 9:10 AM guanyq  wrote:
> >
> > 问题1
> >
> > ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
> >
> > 当yarn application -kill application_1567067657620_0254后,
> >
> > 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
> >
> > 问题2
> >
> > ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
> >
> >
>


Re: Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 文章 zhisheng
hi,立志:

从你的描述(能跑 10 几天且使用的是 FsStateBackend),可以提供一下 JobManager 和 TaskManager 的 GC
时间和次数的监控信息吗?怀疑是不是因为 Full GC 导致的问题。

Best!
zhisheng

张立志  于2020年6月28日周日 上午10:13写道:

> 从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job
> 没有flatmap ,单纯的map reduce
> 统计,能跑10几天,到1个多G的时侯就明显变慢,然后超时10分钟就报错了,从后台的错误日志里,没有明显的异常信息,都是checkpoint
> 超时后的信息.
> 在 2020-06-28 09:58:00,"LakeShen"  写道:
> >Hi 张立志,
> >
> >一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。
> >
> >然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details
> >
> >Best,
> >LakeShen
> >
> >张立志  于2020年6月28日周日 上午9:52写道:
> >
> >> flink 版本1.8
> >> 部署集群yarn
> >>
> >>
> >> 配置代码:
> >> StreamExecutionEnvironment.stateBackend(new
> >>
> FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
> >> 业务代码相对比较简单,内存占用较大
> >> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 文章 zhisheng
hi, Tianwang Li

看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:

> 任务经常会出现反压(特别是在窗口输出的时候)

这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。


> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)

这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象

另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。


Best !
zhisheng


Tianwang Li  于2020年6月28日周日 上午10:17写道:

> 关于Flink checkpoint偶尔会比较长时间的问题。
>
> *环境与背景:*
> 版本:flink1.10.0
> 数据量:每秒约10万左右的记录,数据源是kafka
> 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
>
> *问题:*
> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。
>
> checkpoint情况大致如下:
>
> [image: image.png]
> [image: image.png]
> [image: image.png]
>
> 2020-06-24 21:09:53,369 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
> checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
>
> 2020-06-24 21:09:58,327 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:09:59,266 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:10:08,346 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:10:09,286 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
> 省略
>
>
> 2020-06-24 21:55:39,875 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:55:39,875 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:39,876 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
> checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter
> -> Timestamps/Watermarks (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy   -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://chk-316,
> share

Re: 如何快速定位拖慢速度的 operator

2020-06-25 文章 zhisheng
Hi 徐骁,

可以在 Flink Web UI 的 metric 那里添加每个算子的 in 和 out 的速度进行对比,然后知道到底是哪个算子处处理较慢,eg:

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-06-25-081928.png

可以看下上面图中的测试就是一个 operator chain 在一起的作业,可以看到每个算子并行度的进出流速。

Best !
zhisheng

徐骁  于2020年6月25日周四 上午12:51写道:

> 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊
>


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 文章 zhisheng
Hi,Benchao

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-05-28-093940.jpg

这张图里面说的 TableEnvironment 不支持 UDAF/UDTF,那么如果想要用的话暂时有什么解决方法吗?社区大概什么时候会支持?

Thanks!

Benchao Li  于2020年5月28日周四 下午5:35写道:

> Hi,
>
> 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
>
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:27写道:
>
> > Hi, Benchao:
> >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:05
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:02写道:
> >
> > > Hi, Benchao:
> > > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 15:59
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午3:14写道:
> > >
> > > > Hi,all:
> > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > >
> > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > >
> > > > 其中a是kafka表,connector属性为:
> > > > 'connector.properties.group.id' = 'testGroup',
> > > > 'connector.startup-mode' = 'group-offsets'
> > > >
> > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: save point容灾方案咨询

2020-05-17 文章 zhisheng
hi

如果做 Checkpoint 或者 Savepoint 的时候可以填两个 HDFS 集群的地址路径(一个是你的主集群/另一个是容灾集群)
是不是就可以解决你现在的问题,达到你想要的需求?

Best

zhisheng

请叫我雷锋 <854194...@qq.com> 于2020年5月17日周日 下午7:32写道:

> 谢谢关注:
>
>
> savepoint 容灾 是指的,每次执行savepoint生成的文件,能够在容灾集群上做备份。当主集群变得不可用时,可以将任务迁移到容灾
> 集群进行根据savepoint 进行任务恢复。
>
>
> --原始邮件--
> 发件人:"Congxian Qiu" 发送时间:2020年5月17日(星期天) 晚上6:01
> 收件人:"user-zh"
> 主题:Re: save point容灾方案咨询
>
>
>
> 你好
>
> 请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?
>
> Best,
> Congxian
>
>
> LakeShen 
>  Hi ,
> 
>  你可以把你的场景在描述的详细一些。
> 
>  Best,
>  LakeShen
> 
>  请叫我雷锋 <854194...@qq.com 于2020年5月14日周四 下午9:42写道:
> 
>   各位大佬好,请问有啥好的save point容灾方案嘛?
>  
>  
>  
>   发自我的iPhone
> 


Re: Flink1.10.1关于CliFronted命令行解析顺序引发的BUG

2020-05-15 文章 zhisheng
可以去提个 Issue

111  于2020年5月15日周五 下午5:19写道:

> Hi,
>
>
> 今天再升级Flink1.10.0到Flink1.10.1时,发现我们搭建的开发系统在使用YarnSession模式时无法正常工作,目前的架构是:
>
>
> [自己的平台]—发送sql--> [sql-gateway]—提交jobgraph-->[yarn]
>
>
> 跟踪代码发现,sql-gateway在启动时,需要调用CliFronted的loadCustomCommandLines来解析命令行参数。
>
>
> 在1.10.0版本中,命令行的顺序是:FlinkYarnSessionCLI, ExecutorCLI, DefaultCLI
> 在1.10.1版本中,命令行的顺序是:ExecutorCLI, FlinkYarnSessionCLI, DefaultCLI
>
>
> 修改原因是:
> https://issues.apache.org/jira/browse/FLINK-15852?jql=text%20~%20%22loadCustomCommandLines%22
>
>
>
> 这导致在解析Configuration的时候,sql-gateway无法定位到存储applicationId的配置文件(因为sql-gateway中仅适用第一个Active的commandLine)。
>
>
> 目前我这边的方案是改回原来的顺序,希望了解下官方的解决方案。
>
>
> Best,
> Xinghalo


Re: flink 历史数据join

2020-05-15 文章 zhisheng
看看 Flink UI 上 作业 task 的 sent 和 receive
的数据是否还在变更一般可以知道作业是否还在进行,等不动了,则意味着你这两个表固定的数据都已经 join 完了,等 checkpoint 也
complete 完成了即可以停掉作业。

实在不放心,不知道啥时候跑完,可以晚上开始跑,第二天白天再去看看就好了

jimandlice  于2020年5月15日周五 下午7:38写道:

> 是的 我想用datastrem 来做  join停的话 需要注意什么
>
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:jimandl...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月15日 19:36,zhisheng 写道:
> 所以现在纠结的是使用 DataStream 还是 DataSet ?
>
> 可以使用 DataStream,作业 join 完了停掉作业就行了。
>
> 小黑  于2020年5月15日周五 下午3:28写道:
>
> >
> > 先工作上有一个需求  2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据
> > 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道
> > 是用datatream还是dataset  没有一个很好的 解决方案 望给与回复
> >
> >
> >
> >
> >
>


Re: 回复:怎么排查taskmanager频繁挂掉的原因?

2020-05-15 文章 zhisheng
可以去 yarn 上找找 jobmanager 的日志,挂掉的作业,他的 jobmanager 日志应该还在的

Jeff  于2020年5月15日周五 下午3:28写道:

>
>
>
> 不是,是用per-job方式提交的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-15 14:14:20,"shao.hongxiao" <17611022...@163.com> 写道:
> >你的是batch 模式吗
> >
> >
> >
> >
> >| |
> >邵红晓
> >|
> >|
> >邮箱:17611022...@163.com
> >|
> >
> >签名由 网易邮箱大师 定制
> >
> >在2020年05月15日 15:05,Jeff 写道:
> >hi all,
>
> >最近上线一批新任务后taskmanager频繁挂掉,很可能是OOM问题,操作系统日志里没找到相关记录,flink日志只找到如下部分,但还是不确定是什么原因,请问要怎么确定原因呢?
> >
> >
> >
> >
> >id, channel, rowtime) -> select: (appid, channel, rowtime, 1 AS $f3)
> b91d36766995398a9b0c9416ac1fb6bc.
> >2020-05-14 08:55:30,504 ERROR org.apache.flink.runtime.taskmanager.Task -
> Task did not exit gracefully within 180 + seconds.
> >2020-05-14 08:55:30,505 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Task did not exit
> gracefully within 180 + seconds.
> >2020-05-14 08:55:30,505 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Fatal error
> occurred while executing the TaskManager. Shutting it down...
> >2020-05-14 08:55:30,505 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopping TaskExecutor
> akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0.
> >2020-05-14 08:55:30,508 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader
> service.
> >2020-05-14 08:55:30,510 INFO
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -
> Shutting down TaskExecutorLocalStateStoresManager.
> >2020-05-14 08:55:30,512 INFO 
> >org.apache.flink.runtime.io.disk.iomanager.IOManager
> - I/O manager removed spill file directory
> /tmp/flink-io-c882cdf4-840c-4c62-800e-0ca3226be20b
> >2020-05-14 08:55:30,512 INFO 
> >org.apache.flink.runtime.io.network.NetworkEnvironment
> - Shutting down the network environment and its components.
> >2020-05-14 08:55:30,514 INFO 
> >org.apache.flink.runtime.io.network.netty.NettyClient
> - Successful shutdown (took 2 ms).
> >2020-05-14 08:55:30,517 INFO 
> >org.apache.flink.runtime.io.network.netty.NettyServer
> - Successful shutdown (took 2 ms).
> >2020-05-14 08:55:30,545 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader
> service.
> >2020-05-14 08:55:30,546 INFO org.apache.flink.runtime.filecache.FileCache
> - removed file cache directory
> /tmp/flink-dist-cache-3e0d4351-d5aa-4358-a028-9a59f398d92f
> >2020-05-14 08:55:30,550 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor
> akka.tcp://flink@172.24.150.24:48412/user/taskmanager_0.
> >2020-05-14 08:55:30,552 INFO
> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache
> >2020-05-14 08:55:30,554 INFO
> org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache
> >2020-05-14 08:55:30,563 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC
> service.
> >2020-05-14 08:55:30,566 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down
> remote daemon.
> >2020-05-14 08:55:30,567 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut
> down; proceeding with flushing remote transports.
> >2020-05-14 08:55:30,570 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down
> remote daemon.
> >2020-05-14 08:55:30,571 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut
> down; proceeding with flushing remote transports.
> >2020-05-14 08:55:30,571 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource switched from RUNNING to FAILED.
> >java.lang.RuntimeException: segment has been freed
> >at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> >at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> >at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> >at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> >at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> >at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> >at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
> >at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
> >at DataStreamCalcRule$2658.processElement(Unknown Source)
> >at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
> >at
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
> >at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> >at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> >at
> 

Re: flink 历史数据join

2020-05-15 文章 zhisheng
所以现在纠结的是使用 DataStream 还是 DataSet ?

可以使用 DataStream,作业 join 完了停掉作业就行了。

小黑  于2020年5月15日周五 下午3:28写道:

>
> 先工作上有一个需求  2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据
> 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道
> 是用datatream还是dataset  没有一个很好的 解决方案 望给与回复
>
>
>
>
>


Re: flink1.10 ddl metric 不显示

2020-05-15 文章 zhisheng
是不是因为作业chain在一起了,所以才看不到的?

了不起的盖茨比 <573693...@qq.com> 于2020年5月15日周五 下午3:22写道:

> DDL(source sink 都是kafka-connect) metric 不展示数据,比如接收了多少数据等


Re: Flink-SQL on yarn 的bug

2020-05-15 文章 zhisheng
这个应该不是 bug,如果用代码写,在定义了事件时间的时候,也是要加水印的,否则无法触发窗口的 trigger

guaishushu1...@163.com  于2020年5月15日周五 下午5:36写道:

> insert into t_report_realtime_fangxin2_order1
>
> SELECT date_format(TUMBLE_END(w_ts, INTERVAL '60' SECOND),'-MM-dd') as
> summary_date,
> date_format(TUMBLE_END(w_ts, INTERVAL '60' SECOND), '-MM-dd hh') as
> summary_hour,
> date_format(TUMBLE_END(w_ts, INTERVAL '60' SECOND), '-MM-dd hh:mm') as
> summary_minute,
> 'all' as city1_id,
> 'all' as cate3_id,
> count(DISTINCT(pay_order_id)) as order_num_dj3
> FROM (
> select w_ts,
> JsonIndexOf(ArrayIndexOf(w_data, 0), 'city_id') as city1_id,
> JsonIndexOf(ArrayIndexOf(w_data, 0), 'three_level_cate_id') as cate3_id,
> JsonIndexOf(ArrayIndexOf(w_data, 0), 'pay_order_id') as pay_order_id
> from hdp_lbg_huangye_payorder_binlog
> )
> GROUP BY TUMBLE(w_ts, INTERVAL '60' SECOND)
> ;
>
> 定义了eventime属性,但是算子并没有watermarks  导致数据一致不能输出
>
> --
> guaishushu1...@163.com
>


Re: execution.checkpointing.tolerable-failed-checkpoints 无效

2020-04-30 文章 zhisheng
这个参数好像可以作业里面单独设置,可以试试看

env.getCheckpointConfig().setTolerableCheckpointFailureNumber();

蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年4月30日周四 下午3:07写道:

> hi
> 
> 我在flink-conf.yaml中配置execution.checkpointing.tolerable-failed-checkpoints:
> 100无效,默认为0,也就是不容忍错误,这样的话一个checkpoint出错,job就要重启,这个值该怎么设置呢?
> best
> jungglge


Re: flink背压问题

2020-04-28 文章 zhisheng
hi,

数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度
10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。

1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样);

2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的?

3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况

Best !

zhisheng

阿华田  于2020年4月28日周二 上午9:37写道:

> 线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?
>
>
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

2020-04-23 文章 zhisheng


oliver yunchang  于2020年4月23日周四 上午12:32写道:

> 非常感谢Leonard Xu和zhisheng的回复
>
> > es index 的 mapping 是否提前设置好了?
> 提前设置好了,提前创建索引的mapping如下:
>   {
>   "xxx-2020.04.23": {
> "mappings": {
>   "doc": {
> "dynamic_templates": [
>   {
> "string_fields": {
>   "match": "*",
>   "match_mapping_type": "string",
>   "mapping": {
> "type": "keyword"
>   }
> }
>   }
> ],
> "properties": {
>   "cost": {
> "type": "long"
>   },
>   "result": {
> "type": "keyword"
>   }
> }
>   }
> }
>   }
> }
> 而待写入数据的字段远不止cost和result
> 查看ES官方文档对dynamic_templates的介绍:When putting new dynamic templates through
> the put mapping <
> https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html>
> API, all existing templates are overwritten.[1]
> 个人猜测是:已经设置的mapping未覆盖全数据字段、写入ES时依旧会调用put mapping API做修改,导致异常
>
> 重新调整了新索引的mapping为全字段,failed to process cluster event (put-mapping) within
> 30s异常消失了
>
> [1]
> https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
> <
> https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html#dynamic-templates
> >
> Best,
> Oliver yunchang
>
> > 2020年4月22日 下午4:47,zhisheng  写道:
> >
> > hi,
> >
> > es index 的 mapping 是否提前设置好了?
> >
> > 我看到异常 :
> >
> >> failed to process cluster event (put-mapping) within 30s
> >
> > 像是自动建 mapping 超时了
> >
> > Leonard Xu  于2020年4月22日周三 下午4:41写道:
> >
> >> Hi,
> >>
> >> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
> >> 可以从这方面找思路排查下看看
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >>
> >>
> >>> 在 2020年4月22日,16:10,Oliver  写道:
> >>>
> >>> hi,
> >>>
> >>>
> >>> 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> >>>
> >>
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> >>>
> >>>
> >>> 想咨询下这种问题应该怎么样排查和处理?
> >>>
> >>>
> >>> flink版本:1.10
> >>> ES版本:6.x
> >>>
> >>>
> >>> 使用jar:flink-sql-connector-elasticsearch6_2.12
> >>>
> >>>
> >>> 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> >>> 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> >>>
> >>>
> >>> ES异常如下:
> >>>
> >>>
> >>> 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> >> Elasticsearch item request: ElasticsearchException[Elasticsearch
> exception
> >> [type=process_cluster_event_timeout_exception, reason=failed to process
> >> cluster event (put-mapping) within 30s]]org.apache.flink.
> >> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> >> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> >> reason=failed to process cluster event (put-mapping) within 30s]
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> >>>   at org.apache.flink.elasticsearch6.shaded.org
> >>
> .elasticsearch.client.

Re: 每天0点数据写入Elasticsearch异常且kafka数据堆积

2020-04-22 文章 zhisheng
hi,

es index 的 mapping 是否提前设置好了?

我看到异常 :

> failed to process cluster event (put-mapping) within 30s

像是自动建 mapping 超时了

Leonard Xu  于2020年4月22日周三 下午4:41写道:

> Hi,
>
> 提前创建的索引的shard配置是怎么样的?集群的reallocation、relocation配置怎样的?
> 可以从这方面找思路排查下看看
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年4月22日,16:10,Oliver  写道:
> >
> > hi,
> >
> >
> > 我有一个任务是使用flink将kafka数据写入ES,纯ETL过程,
> >
> 现在遇到的问题是:每天0点之后数据写入ES异常,同时监控发现kafka消息开始堆积,重启任务后,kafka消息堆积现象逐渐恢复,如果不重启则堆积问题一直存在。
> >
> >
> > 想咨询下这种问题应该怎么样排查和处理?
> >
> >
> > flink版本:1.10
> > ES版本:6.x
> >
> >
> > 使用jar:flink-sql-connector-elasticsearch6_2.12
> >
> >
> > 补充:数据零点之后00:00-00:01这一分钟之间存在少量写入成功的数据,但大量数据写入失败。其中索引携带日期后缀
> > 所以零点涉及索引切换,不过第二天索引是提前创建,0点之后数据写入并不涉及新索引的创建
> >
> >
> > ES异常如下:
> >
> >
> > 2020-04-18 00:01:31,722 ERROR ElasticsearchSinkBase: Failed
> Elasticsearch item request: ElasticsearchException[Elasticsearch exception
> [type=process_cluster_event_timeout_exception, reason=failed to process
> cluster event (put-mapping) within 30s]]org.apache.flink.
> elasticsearch6.shaded.org.elasticsearch.ElasticsearchException:
> Elasticsearch exception [type=process_cluster_event_timeout_exception,
> reason=failed to process cluster event (put-mapping) within 30s]
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:375)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.RestClient$1.completed(RestClient.java:366)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> >   at org.apache.flink.elasticsearch6.shaded.org
> .apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> >   at java.lang.Thread.run(Thread.java:748)
> >
> >
> >
> > flinkSQL:
> > CREATE TABLE source_table (
> >  `time` VARCHAR
> >  ,`level` VARCHAR
> >  ,`thread` VARCHAR
> >  ,`class` VARCHAR
> > ) WITH (
> >  'connector.type' = 'kafka',
> >  'connector.version' = 'universal',
> >  'connector.topic' = '',
> >  'connector.startup-mode' = 'latest-offset',
> >  

Re: 【flink-connector-kafka】是否支持Subscribe模式

2020-04-21 文章 zhisheng
可以使用不同的 group.id 消费

i'mpossible <605769...@qq.com> 于2020年4月21日周二 下午6:12写道:

> Hi:
>  Flink支持Subscribe模式吗?用的connector版本是
> flink-connector-kafka-0.11_2.11,0.11x;
>  因为业务需要,我想要优雅下线掉TopicB,即不中断事件流;执行结果发现当Flink服务和A服务指定同一个group.id
> ,同时消费TopicA时,kafka偏移量提交失败(开启了检查点);
>
>
> 感谢解答!!!
>


Re: 如何看到他人问题

2020-04-21 文章 zhisheng
中文用户邮件列表可以看:http://apache-flink.147419.n8.nabble.com/

英文开发邮件列表可以看:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

英文用户邮件列表可以看:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

tison  于2020年4月21日周二 下午5:52写道:

> cc
>
>
> Leonard Xu  于2020年4月21日周二 下午5:03写道:
>
> > Hi,
> > 订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
> > 可以发送任意内容的邮件到  user-zh-subscr...@flink.apache.org  订阅来自
> > user-zh@flink.apache.org 邮件组的邮件
> >
> > 邮件组的订阅管理,可以参考[1]
> >
> > 祝好,
> > Leonard Xu
> >
> https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
> >
> > > 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
> > >
> > > 如何看到他人问题
> >
> >
>


Re: flink java.util.concurrent.TimeoutException

2020-04-16 文章 zhisheng
检查一下这个 TM 的 GC 次数和时间吧

Yangze Guo  于2020年4月15日周三 下午3:03写道:

> 日志上看是Taskmanager心跳超时了,如果tm还在,是不是网络问题呢?尝试把heartbeat.timeout调大一些试试?
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 10:40 AM 欧阳苗  wrote:
> >
> >
> job运行了两天就挂了,然后抛出如下异常,但是taskManager没有挂,其他的job还能正常在上面跑,请问这个问题是什么原因导致的,有什么好的解决办法吗
> >
> >
> > 2020-04-13 06:20:31.379 ERROR 1 --- [ent-IO-thread-3]
> org.apache.flink.runtime.rest.RestClient.parseResponse:393 : Received
> response was neither of the expected type ([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody])
> nor an error.
> Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"2d2a0b4efc8c3d973e2e9490b7b3b2f1","application-status":"FAILED","accumulator-results":{},"net-runtime":217272900,"failure-cause":{"class":"java.util.concurrent.TimeoutException","stack-trace":"java.util.concurrent.TimeoutException:
> Heartbeat of TaskManager with id 0a4ea651244982ef4b4b7092d18de776 timed
> out.\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656)\n\tat
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)\n\tat
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)\n\tat
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)\n\tat
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
> 

Re: FlinkSQL构建流式应用checkpoint设置

2020-04-16 文章 zhisheng
也就是说这种 sql cli 作业启动后如果 kill 掉的时候,再次重启的话是不能够从 savepoint 或者 chekcpoint 恢复是吗?

godfrey he  于2020年4月15日周三 下午4:32写道:

> Hi Even,
>
> 1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism
> 和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set
> execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink
> planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1]
> 另外 SQL CLI 还不支持 checkpoint 的设置。
> 2. 目前 SQL CLI 默认是 in-memory catalog,在每个SQL CLI的独立进程中,不会共享。如果SQL
> CLI挂掉,in-memory catalog 也会消失。你可以配置你的catalog为 hive catalog [1], 这样你创建的表会持久化到
> hive catalog 中,多个SQL CLI使用同一个hive catalog,可以达到你说期望的共享。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files
>
> Best,
> Godfrey
>
> Even <452232...@qq.com> 于2020年4月15日周三 下午3:35写道:
>
> > Hi!
> > 请教两个问题:
> > 1、 Flink SQL CLI 纯文本方式构建一个流式应用,在DDL语句中如何设置checkpoint和并行度这些参数?
> > 2、 Flink SQL CLI
> >
> 纯文本方式构建的流式应用创建的那些表,我在另外一个CLI中是无法找到这些table的,这是为什么?如果任务挂掉了,应该怎么重启,还是必须重新再构建?
>


Re: Re: 关于flink 提交job参数不生效的问题

2020-04-14 文章 zhisheng
可以试试设置 -ytm 2048m  看看是不是还这样

wangweigu...@stevegame.cn  于2020年4月14日周二
下午2:16写道:

>
>   应该是你设置的 -ytm 和 -yjm内存大小比yarn container最小容器内存都小吧!
>  yarn最小容器内存的参数: yarn.scheduler.minimum-allocation-mb
> 容器内存增量: yarn.scheduler.increment-allocation-mb
>
> 发件人: guanyq
> 发送时间: 2020-04-14 14:05
> 收件人: user-zh
> 主题: Re:Re: 关于flink 提交job参数不生效的问题
> ./bin/flink run -m yarn-cluster \-ynm TestDataProcess \-ytm 666
> \-yjm 666 \-c
> com.data.processing.unconditionalacceptance.TestDataProcess
> \./tasks/UnconditionalAcceptanceDataProcess.jar \--group.id
> Test001 \--checkpoint.interval 5000
> 在 2020-04-14 14:00:59,"Xintong Song"  写道:
> >你邮件里的图片没有显示出来。
> >建议把完整的启动命令贴一下。
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Tue, Apr 14, 2020 at 1:11 PM guanyq  wrote:
> >
> >> flink 提交jar包是 指定-ytm不起作用。想知道什么原因?
> >>
> >>
> >>
> >>
>


Re: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-14 文章 zhisheng
应该加了 flink-connector-kafka_2.11-1.10.0.jar 这个就行

wangweigu...@stevegame.cn  于2020年4月13日周一
下午3:09写道:

>
> 感谢flink道友解答,谢谢!
>
>
> 目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
> 这些添加到lib后,程序运行成功!
>
> 发件人: 刘宇宝
> 发送时间: 2020-04-13 14:59
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
> 用官方项目模板起步,
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
>
> 不要往 flink 里头加 jar 包,在你项目的 pom.xml 里加:
>
>   
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
> From: "wangweigu...@stevegame.cn" 
> Reply-To: "user-zh@flink.apache.org" 
> Date: Monday, April 13, 2020 at 2:32 PM
> To: user-zh 
> Subject: Flink
> 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
>
>
> 你好:
>
> 我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat
> jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
> 我在Flink 1.10集群的每个节点下的
> /lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
> 我启动的命令:
> 我先启动了一个Yarn session:
> yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
> 然后在session提交任务测试
> flink run -d -p 2 -m yarn-cluster -c
> com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid
> application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
> 启动就报如下错误:
>[cid:_Foxmail.1@bf61ef0c-2f52-034d-bba5-a41cbf6b4faf]
>
> /lib下的依赖包:
> [cid:_Foxmail.1@0be9c7f1-1b24-8e3e-ea4f-d47b95d9ffaf]
>
> 代码片段:
> [cid:_Foxmail.1@76174c8c-512d-b948-71c9-359c474bf11e]
>
> 就是简单的读取数据,输出测试!
>
> 
> [
> https://exmail.qq.com/cgi-bin/viewfile?type=signature=ZX1328-4PdHqpEhbWjLSGE47md0b7k=688208663
> ]
>
>
>
>
>
>
> 史蒂夫软件(深圳)有限公司
> 技术部   王卫光
> wangweigu...@stevegame.cn
> 地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
> 手机/Mob:13128970998
> http://www.stevengame.com/
>


Re: 关于kafka connector通过python链接

2020-04-08 文章 zhisheng
hi, 秦寒

暂时还没有 Python 这块的 API,可以去社区 JIRA 提建议

Best

zhisheng

秦寒  于2020年4月8日周三 下午4:10写道:

> 您好
>
>Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
> 调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
> 环境等,谢谢。
>
>
>
>


Re: 关于flink生产日志问题

2020-04-08 文章 zhisheng
hi, guanyq

 正常来说任务运行的异常日志可以在 flink ui 中的 taskmanager 日志中查看,如果作业挂了或者被 kill 之后的话 ui
上是看不到日志了,但是可以在 yarn 上面找到该 container,查看该 jobmanager 的日志。

更好的做法是使用一些 log agent(比如 filebeat) 统一采集作业的日志,然后收集到 ElasticSearch
中,这样就可以查看历史的所有作业日志了

Best!

zhisheng

guanyq  于2020年4月8日周三 下午3:12写道:

> 您好:
>
>
>
>
> Run a single Flink job on YARN模式下,
>
> flink生产日志一般如何配置,及使用才能监控到任务运行是的异常和异常日志的。


Re: ddl es 报错

2020-03-24 文章 zhisheng
hi,Leonar Xu

官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?

效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png

Best Wishes!

zhisheng

Leonard Xu  于2020年3月24日周二 下午5:53写道:

> Hi, 出发
> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
> connector只支持csv format,所以会有这个错误。
> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>
> 
> org.apache.flink
> flink-sql-connector-elasticsearch6_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
>
> Best,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> >
>
>
> > 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
> >
> >
> > 源码如下:
> > CREATE TABLE buy_cnt_per_hour (
> > hour_of_day BIGINT,
> > buy_cnt BIGINT
> > ) WITH (
> > 'connector.type' = 'elasticsearch',
> > 'connector.version' = '6',
> > 'connector.hosts' = 'http://localhost:9200',
> > 'connector.index' = 'buy_cnt_per_hour',
> > 'connector.document-type' = 'user_behavior',
> > 'connector.bulk-flush.max-actions' = '1',
> > 'format.type' = 'json',
> > 'update-mode' = 'append'
> > )
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > public class ESTest {
> >
> > public static void main(String[] args) throws Exception {
> >
> > //2、设置运行环境
> > StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, settings);
> > streamEnv.setParallelism(1);
> > String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
> buy_cnt BIGINT "
> > + ") WITH ( 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',"
> > + "'connector.hosts' = 'http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',"
> > + "'connector.document-type' = 'user_behavior',"
> > + "'connector.bulk-flush.max-actions' = '1',\n" + "
>   'format.type' = 'json',"
> > + "'update-mode' = 'append' )";
> > tableEnv.sqlUpdate(sinkDDL);
> > Table table = tableEnv.sqlQuery("select * from test_es ");
> > tableEnv.toRetractStream(table, Row.class).print();
> > streamEnv.execute("");
> > }
> >
> > }
> > 具体error
> > The matching candidates:
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'elasticsearch'
> > 'format.type' expects 'csv', but is 'json'
> >
> > The following properties are requested:
> > connector.bulk-flush.max-actions=1
> > connector.document-type=user_behavior
> > connector.hosts=http://localhost:9200
> > connector.index=buy_cnt_per_hour
> > connector.type=elasticsearch
> > connector.version=6
> > format.type=json
> > schema.0.data-type=BIGINT
> > schema.0.name=hour_of_day
> > schema.1.data-type=BIGINT
> > schema.1.name=buy_cnt
> > update-mode=append
>
>


Re: flink sql 去重算法

2020-03-20 文章 zhisheng
hi, LakeShen

那我这边的场景和你还不太一样,我这边主要是 SQL 作业才有这种问题,算法和数仓的同学他们没有使用 DataStream API 开发,自然也用不上
ProcessFunction 和
Timer,另外场景也不太一样,我遇到的这几个大状态作业才开三小时的滑动窗口(一分钟滑一次),没有你的那种(一天/三天)这么长。不过还是感谢你!

Best wishes,
zhisheng

LakeShen  于2020年3月20日周五 下午3:23写道:

> Hi zhisheng,
>
> 咱们遇到的问题差不多,昨天遇到一个滑动窗口状态很大的问题,由于业务方设置的滑动窗口的窗口时间大(比如一天、三天),同时也是统计 count
> 之类的操作,状态很大。
> 这种滑动窗口操作,我觉得可以先通过滚动窗口(比如10、20分钟)来计算一次,然后业务方使用的时候,扫描最近一段的时间滚动窗口计算的指标值,然后相加。
>
> 还有一种方式,就是使用 ProcessFunction + Timer 的方式来处理这种滑动窗口的计算[1]。
>
> 类似 count(distinct) 这种,目前我还没有比较好的方式来解决,也在研究中。
>
> [1]
>
> https://stackoverflow.com/questions/51977741/flink-performance-issue-with-sliding-time-window
>
> Best wishes,
> LakeShen
>
>
>
>
> zhisheng  于2020年3月20日周五 下午2:21写道:
>
> > hi,LakeShen
> >
> > 1、那个大状态作业之前是我们算法同学写的,是没加官网说的 query_configuration 这个配置,在我的指导下,已经加上
> >
> > 2、Flink 框架层我已经做了默认的配置,使用 RocksDB,并且是增量的,但是还是发现每次 Checkpoint 状态非常大
> >
> > 最近我梳理了下公司的大状态作业,发现通常有这么几个特性:
> >
> > 1、SQL 作业
> >
> > 2、长时间的分组滑动窗口
> >
> > 3、使用 distinct 等关键字的
> >
> > 因为是 SQL
> >
> >
> 作业,开发可能只关心了自己的业务逻辑,而没有去关注这种性能的问题,所以也就可能会导致这种大状态的问题,目前是我自己把这些大状态的作业捞出来后,都一个个联系优化后再上线的,后面我再看看怎么在框架层做到加上这种优化的配置。
> >
> > Best wishes,
> >
> > zhisheng
> >
> > LakeShen  于2020年3月20日周五 下午1:36写道:
> >
> > > Hi zhisheng,
> > >
> > > 我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。
> > > 比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置),
> > > 总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。
> > >
> > > 对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。
> > > Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用
> > > Compaction Filter 算法来清理。
> > >
> > > 第二个就是使用增量 Checkpoint 方式吧。
> > >
> > > Best wishes,
> > > LakeShen
> > >
> > >
> > >
> > > lucas.wu  于2020年3月20日周五 上午11:50写道:
> > >
> > > > 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
> > > >
> > > >
> > > > 原始邮件
> > > > 发件人:zhishengzhisheng2...@gmail.com
> > > > 收件人:user-zhuser...@flink.apache.org
> > > > 发送时间:2020年3月20日(周五) 11:44
> > > > 主题:Re: flink sql 去重算法
> > > >
> > > >
> > > > hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint
> state
> > > > 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
> > > > 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > > > ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com
> 于2020年3月20日周五
> > > > 上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:
> > > > 在batch场景下,MapView的底层实现就是HashMap;
> > > > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
> > > >  hiliuxg 736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:
> >  请问flink
> > > > sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?
> > > >  还是简单通过java的set容器去重的呢? --   Benchao Li  School of Electronics
> > > > Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>


Re: flink sql 去重算法

2020-03-20 文章 zhisheng
hi,LakeShen

1、那个大状态作业之前是我们算法同学写的,是没加官网说的 query_configuration 这个配置,在我的指导下,已经加上

2、Flink 框架层我已经做了默认的配置,使用 RocksDB,并且是增量的,但是还是发现每次 Checkpoint 状态非常大

最近我梳理了下公司的大状态作业,发现通常有这么几个特性:

1、SQL 作业

2、长时间的分组滑动窗口

3、使用 distinct 等关键字的

因为是 SQL
作业,开发可能只关心了自己的业务逻辑,而没有去关注这种性能的问题,所以也就可能会导致这种大状态的问题,目前是我自己把这些大状态的作业捞出来后,都一个个联系优化后再上线的,后面我再看看怎么在框架层做到加上这种优化的配置。

Best wishes,

zhisheng

LakeShen  于2020年3月20日周五 下午1:36写道:

> Hi zhisheng,
>
> 我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。
> 比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置),
> 总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。
>
> 对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。
> Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用
> Compaction Filter 算法来清理。
>
> 第二个就是使用增量 Checkpoint 方式吧。
>
> Best wishes,
> LakeShen
>
>
>
> lucas.wu  于2020年3月20日周五 上午11:50写道:
>
> > 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
> >
> >
> > 原始邮件
> > 发件人:zhishengzhisheng2...@gmail.com
> > 收件人:user-zhuser...@flink.apache.org
> > 发送时间:2020年3月20日(周五) 11:44
> > 主题:Re: flink sql 去重算法
> >
> >
> > hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
> > 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
> > 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> > ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五
> > 上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:
> > 在batch场景下,MapView的底层实现就是HashMap;
> > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
> >  hiliuxg 736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink
> > sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?
> >  还是简单通过java的set容器去重的呢? --   Benchao Li  School of Electronics
> > Engineering and Computer Science, Peking University  Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: flink sql 去重算法

2020-03-19 文章 zhisheng
hi,

我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration

 ,除此之外不清楚大家是否还有什么其他好的解决方法?

Benchao Li  于2020年3月20日周五 上午9:50写道:

> Hi hiliuxg,
>
> count distinct 用的MapVIew来做的去重:
> 在batch场景下,MapView的底层实现就是HashMap;
> 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
>
> hiliuxg <736742...@qq.com> 于2020年3月19日周四 下午11:31写道:
>
> > hi all:
> > 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ?
> > 还是简单通过java的set容器去重的呢?
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Field types of query result and registered TableSink [Result] do not match

2020-03-18 文章 zhisheng
好的,了解了,多谢 Jark

Jark Wu  于2020年3月19日周四 上午10:39写道:

> Hi zhisheng,
>
> 目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写
> DECIMAL ,默认就是 38, 18。
> 这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。
>
> Best,
> Jark
>
> On Thu, 19 Mar 2020 at 10:31, zhisheng  wrote:
>
> > hi, Jark
> >
> > 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:
> >
> > String ddlSource = "CREATE TABLE test (\n" +
> > "yidun_score numeric(5, 2)\n" +
> > ") WITH (\n" +
> > "'connector.type' = 'kafka',\n" +
> > "'connector.version' = '0.11',\n" +
> > "'connector.topic' = 'test',\n" +
> > "'connector.startup-mode' = 'latest-offset',\n" +
> > "'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> > "'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> > "'format.type' = 'json'\n" +
> > ")";
> >
> > String ddlSink = "CREATE TABLE test_aggregate (\n" +
> > "yidun_score numeric(5, 2)\n" +
> > ") WITH (\n" +
> > "'connector.type' = 'jdbc',\n" +
> > "'connector.driver' = 'org.postgresql.Driver',\n" +
> > "'connector.url' =
> > 'jdbc:postgresql://localhost:3600/test',\n" +
> > "'connector.table' = 'test_aggregate', \n" +
> > "'connector.username' = 'admin', \n" +
> > "'connector.password' = '1234546',\n" +
> > "    'connector.write.flush.max-rows' = '1' \n" +
> > ")";
> >
> > String sql = "insert into test_aggregate select yidun_score from
> > test";
> >
> > blinkStreamTableEnv.sqlUpdate(ddlSource);
> > blinkStreamTableEnv.sqlUpdate(ddlSink);
> > blinkStreamTableEnv.sqlUpdate(sql);
> >
> > 没有自定义过 TableSink
> >
> >
> >
> > Jark Wu  于2020年3月19日周四 上午9:43写道:
> >
> > > Hi zhisheng,
> > >
> > > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> > > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> > > 这是框架做的一个合法性校验。
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 19 Mar 2020 at 09:33, zhisheng  wrote:
> > >
> > > > hi, Jark
> > > >
> > > > 我刚使用 1.10.0 测试,报错异常如下:
> > > >
> > > > Exception in thread "main"
> > > org.apache.flink.table.api.ValidationException:
> > > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with
> the
> > > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field
> > of
> > > > the TableSink consumed type.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.f

Re: Field types of query result and registered TableSink [Result] do not match

2020-03-18 文章 zhisheng
对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner

Jark Wu  于2020年3月18日周三 下午11:47写道:

> Hi zhisheng,
>
> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
>
> On Wed, 18 Mar 2020 at 22:21, zhisheng  wrote:
>
> > hi, all
> >
> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> yidun_score
> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: Field types of query result and registered TableSink
> > [Result] do not match.
> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > TableSink schema:[user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > at
> >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > Caused by: org.apache.flink.table.api.ValidationException: Field types of
> > query result and registered TableSink [Result] do not match.
> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > TableSink schema:[user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > at scala.Option.map(Option.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >
> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
> > bug?
> >
>


Field types of query result and registered TableSink [Result] do not match

2020-03-18 文章 zhisheng
hi, all

我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score
字段也是定义的 numeric(5,2) 类型,结果会报异常。

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
[Result] do not match.
Query result schema: [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
TableSink schema:[user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink [Result] do not match.
Query result schema: [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
TableSink schema:[user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)

我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 bug?


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 zhisheng
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-11-133919.png

我看现在还不支持 per job 模式,哎

zhisheng  于2020年3月11日周三 下午9:31写道:

> 好的,我先去 look look,感谢
>
> Kurt Young  于2020年3月11日周三 下午9:30写道:
>
>> https://github.com/ververica/flink-sql-gateway  了解一下
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:
>>
>> > hi, Kurt Young
>> >
>> > 除了使用 sql-client 可以使用纯 SQL
>> 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
>> > sql-client
>> >
>> > Kurt Young  于2020年3月11日周三 下午7:59写道:
>> >
>> > > 那有可能是可以的,你可以试试看
>> > >
>> > > Best,
>> > > Kurt
>> > >
>> > >
>> > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
>> > > wangl...@geekplus.com.cn> wrote:
>> > >
>> > > > Hi Kurt,
>> > > >
>> > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从
>> state
>> > > > 中恢复的功能吗?
>> > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
>> > > > 存储并且再次提交任务可以被访问到直接用吗?
>> > > >
>> > > > 谢谢,
>> > > > 王磊
>> > > >
>> > > > --
>> > > > wangl...@geekplus.com.cn
>> > > >
>> > > >
>> > > > *Sender:* Kurt Young 
>> > > > *Send Time:* 2020-03-11 12:54
>> > > > *Receiver:* wangl...@geekplus.com.cn
>> > > > *cc:* user-zh 
>> > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> > > > sql client 目前还不支持这个功能。
>> > > >
>> > > > Best,
>> > > > Kurt
>> > > >
>> > > >
>> > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
>> > > > wangl...@geekplus.com.cn> wrote:
>> > > >
>> > > >> Hi Kurt,
>> > > >> 确实是可以 直接 flink  cancel -s 保存状态。
>> > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>> > > >>
>> > > >> 谢谢,
>> > > >> 王磊
>> > > >>
>> > > >>
>> > > >> *Sender:* Kurt Young 
>> > > >> *Send Time:* 2020-03-11 10:38
>> > > >> *Receiver:* user-zh 
>> > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>> > > >>
>> > > >> Best,
>> > > >> Kurt
>> > > >>
>> > > >>
>> > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> > > >> wangl...@geekplus.com.cn> wrote:
>> > > >>
>> > > >> > 有两个表:
>> > > >> > tableA: key  valueA
>> > > >> > tableB: key  valueB
>> > > >> >
>> > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
>> > valueA
>> > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> > > >> >
>> > > >> > 谢谢,
>> > > >> > 王磊
>> > > >> >
>> > > >>
>> > > >>
>> > >
>> >
>>
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 zhisheng
好的,我先去 look look,感谢

Kurt Young  于2020年3月11日周三 下午9:30写道:

> https://github.com/ververica/flink-sql-gateway  了解一下
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:
>
> > hi, Kurt Young
> >
> > 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> > sql-client
> >
> > Kurt Young  于2020年3月11日周三 下午7:59写道:
> >
> > > 那有可能是可以的,你可以试试看
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> > > wangl...@geekplus.com.cn> wrote:
> > >
> > > > Hi Kurt,
> > > >
> > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> > > > 中恢复的功能吗?
> > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> > > > 存储并且再次提交任务可以被访问到直接用吗?
> > > >
> > > > 谢谢,
> > > > 王磊
> > > >
> > > > --
> > > > wangl...@geekplus.com.cn
> > > >
> > > >
> > > > *Sender:* Kurt Young 
> > > > *Send Time:* 2020-03-11 12:54
> > > > *Receiver:* wangl...@geekplus.com.cn
> > > > *cc:* user-zh 
> > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > > sql client 目前还不支持这个功能。
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> > > > wangl...@geekplus.com.cn> wrote:
> > > >
> > > >> Hi Kurt,
> > > >> 确实是可以 直接 flink  cancel -s 保存状态。
> > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
> > > >>
> > > >> 谢谢,
> > > >> 王磊
> > > >>
> > > >>
> > > >> *Sender:* Kurt Young 
> > > >> *Send Time:* 2020-03-11 10:38
> > > >> *Receiver:* user-zh 
> > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
> > > >>
> > > >> Best,
> > > >> Kurt
> > > >>
> > > >>
> > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> > > >> wangl...@geekplus.com.cn> wrote:
> > > >>
> > > >> > 有两个表:
> > > >> > tableA: key  valueA
> > > >> > tableB: key  valueB
> > > >> >
> > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
> > valueA
> > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> > > >> >
> > > >> > 谢谢,
> > > >> > 王磊
> > > >> >
> > > >>
> > > >>
> > >
> >
>


  1   2   >