Re: Re: flink对task分配slot问题
明白了,非常感谢您的回复! Thanks, Hongyang liuhy_em...@163.com 发件人: Xintong Song 发送时间: 2020-07-03 13:02 收件人: user-zh 主题: Re: flink对task分配slot问题 Flink 在进行 slot sharing 的时候,不会考虑当前 slot 的任务数、磁盘 IO 这些,而是会遵循“相同 task 的多个 subtask 不能分配到同一个 slot 中”这样的一个规则。 举个例子: 如果作业中有 A, B 两个 vertex,并发为 2,那就有 A1, A2, B1, B2 这 4 个 subtask。 那么 A1 和 A2 不能放到一个 slot 中,B1 和 B2 不能够放到一个 slot 中。 所以,slot sharing 的结果只能是 (A1, B1), (A2, B2) 或 (A1, B2), (A2, B1) 这两种情况。 通常情况下,A 和 B 之间的负载可能存在较大差异,而 A1 和 A2、B1 和 B2 之间通常不会有太大差异。 因此,slot sharing 的规则使得每个 slot 中都分配了一个 A 和一个 B,各个 slot 之间的负载大体上是均衡的。 Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:12 AM liuhy_em...@163.com wrote: > Dear, > > 请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗? > flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗? > > Thanks, > Hongyang >
kafkaf To mysql 写入问题
dear: 请教两个问题 1) 用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样, 改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225 不知道,用Kafka类的话,如何解决 .connect( new Kafka() .version("0.10") .topic("test-input") 2) 对于timestamp类型字段,用JDBCAppendTableSink 把DataStream写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误 kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的 jdbc参数类型设置为Types.SQL_TIMESTAMP thanks
Re: flink对task分配slot问题
Flink 在进行 slot sharing 的时候,不会考虑当前 slot 的任务数、磁盘 IO 这些,而是会遵循“相同 task 的多个 subtask 不能分配到同一个 slot 中”这样的一个规则。 举个例子: 如果作业中有 A, B 两个 vertex,并发为 2,那就有 A1, A2, B1, B2 这 4 个 subtask。 那么 A1 和 A2 不能放到一个 slot 中,B1 和 B2 不能够放到一个 slot 中。 所以,slot sharing 的结果只能是 (A1, B1), (A2, B2) 或 (A1, B2), (A2, B1) 这两种情况。 通常情况下,A 和 B 之间的负载可能存在较大差异,而 A1 和 A2、B1 和 B2 之间通常不会有太大差异。 因此,slot sharing 的规则使得每个 slot 中都分配了一个 A 和一个 B,各个 slot 之间的负载大体上是均衡的。 Thank you~ Xintong Song On Fri, Jul 3, 2020 at 11:12 AM liuhy_em...@163.com wrote: > Dear, > > 请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗? > flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗? > > Thanks, > Hongyang >
回复:在没有开启Checkpoint的情况下,当作业重启时,历史数据是否会残留在内存中?
这种现象只会出现在on rocksdb中。 | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年07月03日 11:21,SmileSmile 写道: Hi 我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。 历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime() 的clearAllState实现清理,如果作业重启,又没有开启checkpoint,尚未被处理的历史数据是否一直残留在内存中无法清理? 是否有哪位大佬可以帮忙解惑? | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制
在没有开启Checkpoint的情况下,当作业重启时,历史数据是否会残留在内存中?
Hi 我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。 历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime() 的clearAllState实现清理,如果作业重启,又没有开启checkpoint,尚未被处理的历史数据是否一直残留在内存中无法清理? 是否有哪位大佬可以帮忙解惑? | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制
flink对task分配slot问题
Dear, 请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗? flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗? Thanks, Hongyang
Re: Flink job不定期就会重启,版本是1.9
从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。 Thank you~ Xintong Song On Fri, Jul 3, 2020 at 10:48 AM noon cjihg wrote: > Hi,大佬们 > > Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗? > > 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator > flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. > 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator > flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. > 2020-07-01 20:20:43.875 [flink-metrics-16] INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator > flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut > down. > 2020-07-01 20:20:43.875 [flink-metrics-16] INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator > flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut > down. > 2020-07-01 20:20:43.891 [flink-metrics-16] INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC > service. > 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating > cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. > java.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/resourcemanager#-781959047]] after [1 > ms]. Message of type > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) > at akka.dispatch.OnComplete.internal(Future.scala:263) > at akka.dispatch.OnComplete.internal(Future.scala:261) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > at java.lang.Thread.run(Thread.java:745) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/resourcemanager#-781959047]] after [1 > ms]. Message of type > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. > at > akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at > akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > ... 9 common frames omitted >
Flink job不定期就会重启,版本是1.9
Hi,大佬们 Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗? 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.891 [flink-metrics-16] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:745) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) ... 9 common frames omitted
Re: 做实时数仓,sql怎么保证分topic区有序
kafka默认分区有序,所以source的并发一般小于等于kafka的partition数,理想状态是1:1 sink的并发一般也是也是和输出topic相关,如果要保证有序,可以按key进行分区, 保证数据均匀可以自定义分区策略,比如roundrobin、shuffle等 > 2020年7月2日 下午6:39,air23 写道: > > hi > 就是我用 > flink sql 通过ddl读取和写入kafka怎么设置并行度呢? > flink sql 通过ddl写入kafka怎么自定义分区呢? > > > 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。 > > > > >
Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
public void invoke(ObjectNode node, Context context) throws Exception { String tableName = node.get("metadata").get("topic").asText(); Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10)); meter.markEvent(); log.info("### counter: " + meter.toString() + "\t" + meter.getCount()); 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics. 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。 谢谢, 王磊 wangl...@geekplus.com.cn Sender: kcz Send Time: 2020-07-03 09:13 Receiver: wanglei2 Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧 -- 原始邮件 -- 发件人: 王磊2 发送时间: 2020年7月2日 21:46 收件人: user-zh , 17610775726 <17610775...@163.com> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 没有明白你说的实现方式。 我最终要得到类似的 Metrics: myCounter_table1, myCounter_table2, ..., myCounter_tableX 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。 谢谢, 王磊 -- 发件人:JasonLee <17610775...@163.com> 发送时间:2020年7月2日(星期四) 21:12 收件人:user-zh 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 你把tablename传到下面metric里不就行了吗 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道: 全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: flink任务提交方式
Hi, 1.10.x版本以后env.execute()是返回一个JobExecutionResult 对象的,这里面可以获取到job相关信息,比如你想要的jobid > 2020年7月2日 下午12:09,Dream-底限 写道: > > hi > 请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具
Re: 做实时数仓,sql怎么保证分topic区有序
Hi air23, > flink sql 通过ddl读取和写入kafka怎么设置并行度呢? 你可以为你的程序设置默认的并发度,代码或者命令行参数,配置文件都可以。 > flink sql 通过ddl写入kafka怎么自定义分区呢? kafka sink 自定义分区器: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#kafka-connector , 将 'connector.sink-partitioner'设置为 'custom', 然后设置 ' connector.sink-partitioner-class'. Best, LakeShen shizk233 于2020年7月2日周四 下午7:46写道: > Hi air23, > > sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。 > 你可以试试流转表,可以做到细粒度的控制。 > > Best, > shizk233 > > air23 于2020年7月2日周四 下午6:40写道: > > > hi > > 就是我用 > >flink sql 通过ddl读取和写入kafka怎么设置并行度呢? > >flink sql 通过ddl写入kafka怎么自定义分区呢? > > > > > > 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。 > > > > > > > > > > > > >
Re: Flink Training - why cannot keyBy hour?
Hi David, Thanks a lot for the explanation! Eleanore On Thu, Jul 2, 2020 at 6:30 AM David Anderson wrote: > Eleanore, > > Yes, if you change the implementation in the way that is suggested by the > slide, the tests will fail. But it's more interesting to observe the > behavior in the console. > > The notes that go with that slide explain the situation in more detail. > (Use alt-p or option-p to see the notes). But to recap here, there are two > related effects: > > (1) Instead of producing a single result at the end of the window, this > alternative implementation produces a result for every event. In other > words, it produces a stream that eventually arrives at the same maximum > value produced by the timeWindowAll. > > (2) With timeWindowAll, once the results for a given hour have been > produced, Flink frees the state associated with the window for that hour. > It knows, based on the watermarking, that no more events are expected, so > the state is no longer needed and can be cleared. But with maxBy, the state > for each key (each hour) is kept forever. This is why this is not a good > approach: the keyspace is unbounded, and we can't intervene to clean up > stale state. > > Regards, > David > > On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin > wrote: > >> Hi experts, >> >> I am going through Ververica flink training, and when doing the lab with >> window (https://training.ververica.com/exercises/windows), basically it >> requires to compute within an hour which driver earns the most tip. >> >> The logic is to >> 0. keyBy driverId >> 1. create 1 hour window based on eventTime >> 2. sum up all the tips for this driver within this 1 hour window >> 3. create an 1 hour globalWindow for all drivers >> 4. find the max tips >> >> sample code shown as below. >> >> SingleOutputStreamOperator> >> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId) >> .window(TumblingEventTimeWindows.of(Time.hours(1))) >> .process(new SumTipsFunction()); >> >> // Tuple3: reporting the timestamp for the end of the hour, the driverId, >> and the total of that driver's tips for that hour >> SingleOutputStreamOperator> hourlyMax = >> >> aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1))) >> .maxBy(2); >> >> >> The question is shown as 4th slide: why we cannot keyed by the hour? >> >> If I change the implementation to keyBy hour and run the HourlyTipsTest, >> >> the test of testMaxAcrossDrivers will fail: >> >> // (94668840,1,6.0) -> for timestamp window: 94668840, driverId: 1, >> earns most tip: 6.0 >> >> Expected :[(94668840,1,6.0), (94669200,2,20.0)] >> Actual :[(94668840,1,6.0), (94669200,2,20.0), >> (94669200,2,20.0)] >> >> >> [image: image.png] >> >> Thanks a lot! >> Eleanore >> >>
回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
没有明白你说的实现方式。 我最终要得到类似的 Metrics: myCounter_table1, myCounter_table2, ..., myCounter_tableX 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。 谢谢, 王磊 -- 发件人:JasonLee <17610775...@163.com> 发送时间:2020年7月2日(星期四) 21:12 收件人:user-zh 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 你把tablename传到下面metric里不就行了吗 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道: 全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
你把tablename传到下面metric里不就行了吗 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道: 全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: 做实时数仓,sql怎么保证分topic区有序
Hi air23, sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。 你可以试试流转表,可以做到细粒度的控制。 Best, shizk233 air23 于2020年7月2日周四 下午6:40写道: > hi > 就是我用 >flink sql 通过ddl读取和写入kafka怎么设置并行度呢? >flink sql 通过ddl写入kafka怎么自定义分区呢? > > > 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。 > > > > > >
Re: Position out of bounds.
感谢 没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案? On 7/2/2020 18:39,夏帅 wrote: 你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
回复:Position out of bounds.
不好意思,看错了,这里是自增了 来自钉钉专属商务邮箱-- 发件人:xuhaiLong 日 期:2020年07月02日 18:46:37 收件人:夏帅 抄 送:user-zh 主 题:回复:Position out of bounds. 感谢 没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案? | | 夏* | | 邮箱:xiagu...@163.com | 签名由 网易邮箱大师 定制 在2020年07月02日 18:39,夏帅 写道: 你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
回复:Position out of bounds.
感谢 没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案? | | 夏* | | 邮箱:xiagu...@163.com | 签名由 网易邮箱大师 定制 在2020年07月02日 18:39,夏帅 写道: 你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
做实时数仓,sql怎么保证分topic区有序
hi 就是我用 flink sql 通过ddl读取和写入kafka怎么设置并行度呢? flink sql 通过ddl写入kafka怎么自定义分区呢? 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。
回复:Position out of bounds.
你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
rocksdb的block cache usage应该如何使用
通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是 flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。 我们的作业一个TM的内存设置如下: taskmanager.memory.process.size: 23000m taskmanager.memory.managed.fraction: 0.4 ui上显示的Flink Managed MEM是8.48G。 通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。 sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host) 如果维度是host,operator_name,每个operator_name维度是22G。 sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host,operator_name) 请问这个指标应该如何使用? | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制
Position out of bounds.
flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5 2020-07-02 17:06:19,409 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (2/3) (d847db42ed1d92ac373f9ccf27b846f0) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,410 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (3/3) (ca825ba9712eb520ff6de6b0f9de4dc1) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,426 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (3/3) (b05f5b66fd4c65a9032bb0140a4ce3d1) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,427 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,472 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- groupBy: (userId, sourceUuid, categoryId, gender), select: (userId, sourceUuid, categoryId, gender, MAX(siteId) AS siteId, MAX(score) AS score) -> select: (userId, categoryId, gender, score, siteId) (1/3) (f8f49357b9121d97816d5f83569cd6ac) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,472 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (1/3) (37befed4aefab35588e5f6d4c372b8c4) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,492 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (1/3) (2de7f2a3809e8d7e97197cbc0f7c8b4b) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,498 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (1/3) (1fe6456a019617839a573d55b1194541) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:52,263 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@61d8240a. java.lang.IllegalArgumentException: Position out of bounds. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) at org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:368) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:189) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(RocksDBSerializedCompositeKeyBuilder.java:144) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:149) at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:120) at org.apache.flink.runtime.state.ttl.TtlMapState.lambda$getWrapped$0(TtlMapState.java:61) at org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92) at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:60) at org.apache.flink.runtime.state.ttl.TtlMapState.contains(TtlMapState.java:93) at org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72) at com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:272) at com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:237) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
回复: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
全都是同一种类型的 metrics. 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 meter 类型) 谢谢, 王磊 wangl...@geekplus.com.cn 发件人: JasonLee 发送时间: 2020-07-02 16:16 收件人: user-zh 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? 是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ? | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: 官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
flink asynctablefunction调用异常
hi, 我在使用flink 1.10.1 blink planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。 遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : java.lang.Exception: Could not complete the stream element: org.apache.flink.table.dataformat.BinaryRow caused by : java.util.concurrent.TimeoutException: Async function call has timed out. 我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。
在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
官网上的例子: public class MyMapper extends RichMapFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); } @Override public String map(String value) throws Exception { this.counter.inc(); return value; } } 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: flink基于yarn的HA次数无效,以及HA拉起的任务是否可以重用state
判断 Attempt 失败的标准是 Flink 通过 AMRMClientAsyncImpl 通知 YARN RM Application 失败并注销自己,所以 kill jm 是不算的。 Best, Paul Lam > 2020年7月2日 11:09,liangji 写道: > > 我之前配置了HA,也配置了flink中yarn-attempts=2,结果是kill jm进程可以无限重启 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于flink sql问题
Hi 本超 关于Mysql 做维表,关掉cache后的优化手段,有什么建议吗? 比如,20k records per second 的流量,关掉 cache 会对 mysql 产生很大的压力。不知道 MySQL Lookup 做成 async + batch 会不会提升性能或者有副作用。 Best forideal. 在 Benchao Li ,2020年7月1日 13:22写道: 我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。 如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证每次都是获取Mysql中最新的数据就可以了吧? 当然了,在DDL的时候并没有区分这个表是维表还是sink表,具体它是什么类型,只是根据你在SQL里面怎么使用来决定的。 理论上来讲,你一个DDL可以同时做维表也可以做Sink。(只是它们可能有些配置会不同,分开写两个DDL应该是更清晰一些) zya 于2020年6月30日周二 下午11:26写道: > 请问下,sink写出的表能做维表吗,因为sink会一直写入,做维表的话会一直动态变化 > > > > > > > > > > > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年6月30日(星期二) 晚上11:14 > 收件人:"user-zh" > 主题:Re: 关于flink sql问题 > > > > 应该做一个维表Join就可以了。 > > > zya > Hi 各位,有个问题想请教一下: > nbsp; nbsp; 目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql, > > > nbsp;nbsp;nbsp;nbsp;在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗? > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: flink任务提交方式
好的,感谢 On Thu, Jul 2, 2020 at 12:37 PM jianxu wrote: > 你可以看下这个项目https://github.com/todd5167/clusters-submiter,改造下应该满足你的需求。 > 在 2020-07-02 12:09:05,"Dream-底限" 写道: > >hi > > >请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具 >
Re: flink的state过期设置
Hi TTL的时间戳实际是会存储在 state 里面 [1],与每个entry在一起,也就是说从Checkpoint恢复的话,数据里面的时间戳是当时插入时候的时间戳。 [1] https://github.com/apache/flink/blob/ba92b3b8b02e099c8aab4b2b23a37dca4558cabd/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L50 祝好 唐云 From: hdxg1101300...@163.com Sent: Thursday, July 2, 2020 11:17 To: user-zh Subject: flink的state过期设置 您好: 想咨询一下关于state的ttl问题; 想问一下 state设置的ttl,如果从checkpoints重启 ttl会不会失效;ttl针对的是process time, 比如我设置的7天过期,重新从checkpoints启动是第一次启动的时间算还是恢复时的新processtime算;他是state的一部分 还是怎么算; 或者要注册定时器来实现 hdxg1101300...@163.com