Re: Re: flink对task分配slot问题

2020-07-02 文章 liuhy_em...@163.com
明白了,非常感谢您的回复!

Thanks,
Hongyang


liuhy_em...@163.com
 
发件人: Xintong Song
发送时间: 2020-07-03 13:02
收件人: user-zh
主题: Re: flink对task分配slot问题
Flink 在进行 slot sharing 的时候,不会考虑当前 slot 的任务数、磁盘 IO 这些,而是会遵循“相同 task 的多个
subtask 不能分配到同一个 slot 中”这样的一个规则。
 
举个例子:
如果作业中有 A, B 两个 vertex,并发为 2,那就有 A1, A2, B1, B2 这 4 个 subtask。
那么 A1 和 A2 不能放到一个 slot 中,B1 和 B2 不能够放到一个 slot 中。
所以,slot sharing 的结果只能是 (A1, B1), (A2, B2) 或 (A1, B2), (A2, B1) 这两种情况。
通常情况下,A 和 B 之间的负载可能存在较大差异,而 A1 和 A2、B1 和 B2 之间通常不会有太大差异。
因此,slot sharing 的规则使得每个 slot 中都分配了一个 A 和一个 B,各个 slot 之间的负载大体上是均衡的。
 
Thank you~
 
Xintong Song
 
 
 
On Fri, Jul 3, 2020 at 11:12 AM liuhy_em...@163.com 
wrote:
 
> Dear,
>
> 请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗?
> flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗?
>
> Thanks,
> Hongyang
>


kafkaf To mysql 写入问题

2020-07-02 文章 郑斌斌
dear:
   请教两个问题
1)  用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样,
改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225
不知道,用Kafka类的话,如何解决
.connect(
new Kafka()
  .version("0.10")
  .topic("test-input")
2)  对于timestamp类型字段,用JDBCAppendTableSink 
把DataStream写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误
kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的
jdbc参数类型设置为Types.SQL_TIMESTAMP
thanks


Re: flink对task分配slot问题

2020-07-02 文章 Xintong Song
Flink 在进行 slot sharing 的时候,不会考虑当前 slot 的任务数、磁盘 IO 这些,而是会遵循“相同 task 的多个
subtask 不能分配到同一个 slot 中”这样的一个规则。

举个例子:
如果作业中有 A, B 两个 vertex,并发为 2,那就有 A1, A2, B1, B2 这 4 个 subtask。
那么 A1 和 A2 不能放到一个 slot 中,B1 和 B2 不能够放到一个 slot 中。
所以,slot sharing 的结果只能是 (A1, B1), (A2, B2) 或 (A1, B2), (A2, B1) 这两种情况。
通常情况下,A 和 B 之间的负载可能存在较大差异,而 A1 和 A2、B1 和 B2 之间通常不会有太大差异。
因此,slot sharing 的规则使得每个 slot 中都分配了一个 A 和一个 B,各个 slot 之间的负载大体上是均衡的。

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 11:12 AM liuhy_em...@163.com 
wrote:

> Dear,
>
> 请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗?
> flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗?
>
> Thanks,
> Hongyang
>


回复:在没有开启Checkpoint的情况下,当作业重启时,历史数据是否会残留在内存中?

2020-07-02 文章 SmileSmile
这种现象只会出现在on rocksdb中。




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 11:21,SmileSmile 写道:

Hi

我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory 
usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。


历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime() 
的clearAllState实现清理,如果作业重启,又没有开启checkpoint,尚未被处理的历史数据是否一直残留在内存中无法清理?


是否有哪位大佬可以帮忙解惑?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在没有开启Checkpoint的情况下,当作业重启时,历史数据是否会残留在内存中?

2020-07-02 文章 SmileSmile

Hi

我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory 
usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。


历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime() 
的clearAllState实现清理,如果作业重启,又没有开启checkpoint,尚未被处理的历史数据是否一直残留在内存中无法清理?


是否有哪位大佬可以帮忙解惑?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

flink对task分配slot问题

2020-07-02 文章 liuhy_em...@163.com
Dear,
请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗?
flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗?

Thanks,
Hongyang


Re: Flink job不定期就会重启,版本是1.9

2020-07-02 文章 Xintong Song
从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。
建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 10:48 AM noon cjihg  wrote:

> Hi,大佬们
>
> Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?
>
> 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator
> flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
> 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator
> flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
> 2020-07-01 20:20:43.875 [flink-metrics-16] INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator
> flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
> down.
> 2020-07-01 20:20:43.875 [flink-metrics-16] INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator
> flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
> down.
> 2020-07-01 20:20:43.891 [flink-metrics-16] INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped Akka RPC
> service.
> 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Terminating
> cluster entrypoint process YarnJobClusterEntrypoint with exit code 2.
> java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/resourcemanager#-781959047]] after [1
> ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't
> send a reply.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
> at akka.dispatch.OnComplete.internal(Future.scala:263)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/resourcemanager#-781959047]] after [1
> ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't
> send a reply.
> at
> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at
> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> ... 9 common frames omitted
>


Flink job不定期就会重启,版本是1.9

2020-07-02 文章 noon cjihg
Hi,大佬们

Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?

2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down.
2020-07-01 20:20:43.875 [flink-metrics-16] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
down.
2020-07-01 20:20:43.875 [flink-metrics-16] INFO
akka.remote.RemoteActorRefProvider$RemotingTerminator
flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut
down.
2020-07-01 20:20:43.891 [flink-metrics-16] INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped Akka RPC
service.
2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Terminating
cluster entrypoint process YarnJobClusterEntrypoint with exit code 2.
java.util.concurrent.CompletionException:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/resourcemanager#-781959047]] after [1
ms]. Message of type
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't
send a reply.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
at akka.dispatch.OnComplete.internal(Future.scala:263)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/resourcemanager#-781959047]] after [1
ms]. Message of type
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't
send a reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
... 9 common frames omitted


Re: 做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 admin
kafka默认分区有序,所以source的并发一般小于等于kafka的partition数,理想状态是1:1
sink的并发一般也是也是和输出topic相关,如果要保证有序,可以按key进行分区,
保证数据均匀可以自定义分区策略,比如roundrobin、shuffle等

> 2020年7月2日 下午6:39,air23  写道:
> 
> hi
> 就是我用
>   flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
>   flink sql 通过ddl写入kafka怎么自定义分区呢?
> 
> 
> 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置  或者做自定义分区。
> 
> 
> 
> 
> 



Re: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 wangl...@geekplus.com.cn
public void invoke(ObjectNode node, Context context) throws Exception {

String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new 
MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());

如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics.
但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。

谢谢,
王磊


wangl...@geekplus.com.cn 

 
Sender: kcz
Send Time: 2020-07-03 09:13
Receiver: wanglei2
Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧



-- 原始邮件 --
发件人: 王磊2 
发送时间: 2020年7月2日 21:46
收件人: user-zh , 17610775726 <17610775...@163.com>
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?


没有明白你说的实现方式。

我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ..., myCounter_tableX
但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。

谢谢,
王磊



--
发件人:JasonLee <17610775...@163.com>
发送时间:2020年7月2日(星期四) 21:12
收件人:user-zh 
主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

你把tablename传到下面metric里不就行了吗


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型)

谢谢,
王磊




wangl...@geekplus.com.cn


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:

官网上的例子:

public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
  this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
  this.counter.inc();
  return value;
}
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn




Re: flink任务提交方式

2020-07-02 文章 admin
Hi,
1.10.x版本以后env.execute()是返回一个JobExecutionResult
对象的,这里面可以获取到job相关信息,比如你想要的jobid

> 2020年7月2日 下午12:09,Dream-底限  写道:
> 
> hi
> 请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具



Re: 做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 LakeShen
Hi air23,

  > flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
 你可以为你的程序设置默认的并发度,代码或者命令行参数,配置文件都可以。

>  flink sql 通过ddl写入kafka怎么自定义分区呢?
kafka sink 自定义分区器:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#kafka-connector
,
将 'connector.sink-partitioner'设置为 'custom', 然后设置 '
connector.sink-partitioner-class'. Best,
LakeShen

shizk233  于2020年7月2日周四 下午7:46写道:

> Hi air23,
>
> sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。
> 你可以试试流转表,可以做到细粒度的控制。
>
> Best,
> shizk233
>
> air23  于2020年7月2日周四 下午6:40写道:
>
> > hi
> > 就是我用
> >flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
> >flink sql 通过ddl写入kafka怎么自定义分区呢?
> >
> >
> > 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置  或者做自定义分区。
> >
> >
> >
> >
> >
> >
>


Re: Flink Training - why cannot keyBy hour?

2020-07-02 文章 Eleanore Jin
Hi David,

Thanks a lot for the explanation!

Eleanore

On Thu, Jul 2, 2020 at 6:30 AM David Anderson  wrote:

> Eleanore,
>
> Yes, if you change the implementation in the way that is suggested by the
> slide, the tests will fail. But it's more interesting to observe the
> behavior in the console.
>
> The notes that go with that slide explain the situation in more detail.
> (Use alt-p or option-p to see the notes). But to recap here, there are two
> related effects:
>
> (1) Instead of producing a single result at the end of the window, this
> alternative implementation produces a result for every event. In other
> words, it produces a stream that eventually arrives at the same maximum
> value produced by the timeWindowAll.
>
> (2) With timeWindowAll, once the results for a given hour have been
> produced, Flink frees the state associated with the window for that hour.
> It knows, based on the watermarking, that no more events are expected, so
> the state is no longer needed and can be cleared. But with maxBy, the state
> for each key (each hour) is kept forever. This is why this is not a good
> approach: the keyspace is unbounded, and we can't intervene to clean up
> stale state.
>
> Regards,
> David
>
> On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin 
> wrote:
>
>> Hi experts,
>>
>> I am going through Ververica flink training, and when doing the lab with
>> window (https://training.ververica.com/exercises/windows), basically it
>> requires to compute within an hour which driver earns the most tip.
>>
>> The logic is to
>> 0. keyBy driverId
>> 1. create 1 hour window based on eventTime
>> 2. sum up all the tips for this driver within this 1 hour window
>> 3. create an 1 hour globalWindow for all drivers
>> 4. find the max tips
>>
>> sample code shown as below.
>>
>> SingleOutputStreamOperator> 
>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
>>  .window(TumblingEventTimeWindows.of(Time.hours(1)))
>>  .process(new SumTipsFunction());
>>
>> // Tuple3: reporting the timestamp for the end of the hour, the driverId, 
>> and the total of that driver's tips for that hour
>> SingleOutputStreamOperator> hourlyMax =
>>  
>> aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
>>  .maxBy(2);
>>
>>
>> The question is shown as 4th slide: why we cannot keyed by the hour?
>>
>> If I change the implementation to keyBy hour and run the HourlyTipsTest,
>>
>> the test of testMaxAcrossDrivers will fail:
>>
>> // (94668840,1,6.0) -> for timestamp window: 94668840, driverId: 1, 
>> earns most tip: 6.0
>>
>> Expected :[(94668840,1,6.0), (94669200,2,20.0)]
>> Actual   :[(94668840,1,6.0), (94669200,2,20.0), 
>> (94669200,2,20.0)]
>>
>>
>> [image: image.png]
>>
>> Thanks a lot!
>> Eleanore
>>
>>


回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 王磊2

没有明白你说的实现方式。

我最终要得到类似的 Metrics:  myCounter_table1, myCounter_table2, ..., myCounter_tableX
但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。

谢谢,
王磊



--
发件人:JasonLee <17610775...@163.com>
发送时间:2020年7月2日(星期四) 21:12
收件人:user-zh 
主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

你把tablename传到下面metric里不就行了吗


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型)

谢谢,
王磊




wangl...@geekplus.com.cn


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:

官网上的例子:

public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
  this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
  this.counter.inc();
  return value;
}
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn




回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 JasonLee
你把tablename传到下面metric里不就行了吗


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道:

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型)

谢谢,
王磊




wangl...@geekplus.com.cn


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:

官网上的例子:

public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
  this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
  this.counter.inc();
  return value;
}
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn



Re: 做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 shizk233
Hi air23,

sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。
你可以试试流转表,可以做到细粒度的控制。

Best,
shizk233

air23  于2020年7月2日周四 下午6:40写道:

> hi
> 就是我用
>flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
>flink sql 通过ddl写入kafka怎么自定义分区呢?
>
>
> 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置  或者做自定义分区。
>
>
>
>
>
>


Re: Position out of bounds.

2020-07-02 文章 xuhaiLong
感谢
没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案?


On 7/2/2020 18:39,夏帅 wrote:
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
if (this.position >= this.buffer.length) {
resize(1);
}
this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.


flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5



回复:Position out of bounds.

2020-07-02 文章 夏帅
不好意思,看错了,这里是自增了





来自钉钉专属商务邮箱--
发件人:xuhaiLong
日 期:2020年07月02日 18:46:37
收件人:夏帅
抄 送:user-zh
主 题:回复:Position out of bounds.

感谢
没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案?


| |
夏*
|
|
邮箱:xiagu...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月02日 18:39,夏帅 写道:
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
  if (this.position >= this.buffer.length) {
 resize(1);
  }
  this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.


flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5




回复:Position out of bounds.

2020-07-02 文章 xuhaiLong
感谢
没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案?


| |
夏*
|
|
邮箱:xiagu...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月02日 18:39,夏帅 写道:
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
  if (this.position >= this.buffer.length) {
 resize(1);
  }
  this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.

 
flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5



做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 air23
hi
就是我用
   flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
   flink sql 通过ddl写入kafka怎么自定义分区呢?


这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置  或者做自定义分区。





 

回复:Position out of bounds.

2020-07-02 文章 夏帅
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
   if (this.position >= this.buffer.length) {
  resize(1);
   }
   this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.

  
flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
 程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5



rocksdb的block cache usage应该如何使用

2020-07-02 文章 SmileSmile

通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Position out of bounds.

2020-07-02 文章 xuhaiLong
flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
 https://www.helloimg.com/image/Pe1QR
 程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5

2020-07-02 17:06:19,409 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (2/3) (d847db42ed1d92ac373f9ccf27b846f0) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,410 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (3/3) (ca825ba9712eb520ff6de6b0f9de4dc1) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,426 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (3/3) (b05f5b66fd4c65a9032bb0140a4ce3d1) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,427 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,472 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- groupBy: 
(userId, sourceUuid, categoryId, gender), select: (userId, sourceUuid, 
categoryId, gender, MAX(siteId) AS siteId, MAX(score) AS score) -> select: 
(userId, categoryId, gender, score, siteId) (1/3) 
(f8f49357b9121d97816d5f83569cd6ac) switched from DEPLOYING to RUNNING.
2020-07-02 17:06:19,472 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (1/3) (37befed4aefab35588e5f6d4c372b8c4) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,492 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (1/3) (2de7f2a3809e8d7e97197cbc0f7c8b4b) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,498 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (1/3) (1fe6456a019617839a573d55b1194541) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:52,263 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from RUNNING to 
FAILED on 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@61d8240a.
java.lang.IllegalArgumentException: Position out of bounds.
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:368)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:189)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(RocksDBSerializedCompositeKeyBuilder.java:144)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:149)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:120)
at 
org.apache.flink.runtime.state.ttl.TtlMapState.lambda$getWrapped$0(TtlMapState.java:61)
at 
org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92)
at 
org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:60)
at 
org.apache.flink.runtime.state.ttl.TtlMapState.contains(TtlMapState.java:93)
at 
org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
at 
com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:272)
at 
com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:237)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
   

回复: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 wangl...@geekplus.com.cn

全都是同一种类型的 metrics.
比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 metrics(但都是 
meter 类型) 

谢谢,
王磊




wangl...@geekplus.com.cn 


发件人: JasonLee
发送时间: 2020-07-02 16:16
收件人: user-zh
主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
是要生成不同类型的metric吗 比如counter meter ?
 
 
| |
JasonLee
|
|
邮箱:17610775...@163.com
|
 
Signature is customized by Netease Mail Master
 
在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:
 
官网上的例子:
 
public class MyMapper extends RichMapFunction {
private transient Counter counter;
@Override
public void open(Configuration config) {
   this.counter = getRuntimeContext()
 .getMetricGroup()
 .counter("myCounter");
}
@Override
public String map(String value) throws Exception {
   this.counter.inc();
   return value;
}
}
 
我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn
 


回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 JasonLee
是要生成不同类型的metric吗 比如counter meter ?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道:

官网上的例子:

public class MyMapper extends RichMapFunction {
 private transient Counter counter;
 @Override
 public void open(Configuration config) {
   this.counter = getRuntimeContext()
 .getMetricGroup()
 .counter("myCounter");
 }
 @Override
 public String map(String value) throws Exception {
   this.counter.inc();
   return value;
 }
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn



flink asynctablefunction调用异常

2020-07-02 文章 sunfulin
hi,
我在使用flink 1.10.1 blink 
planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by : 
java.lang.Exception: Could not complete the stream element: 
org.apache.flink.table.dataformat.BinaryRow  caused by : 
java.util.concurrent.TimeoutException: Async function call has timed out.


我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。

在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?

2020-07-02 文章 wangl...@geekplus.com.cn

官网上的例子:

public class MyMapper extends RichMapFunction {
  private transient Counter counter;
  @Override
  public void open(Configuration config) {
this.counter = getRuntimeContext()
  .getMetricGroup()
  .counter("myCounter");
  }
  @Override
  public String map(String value) throws Exception {
this.counter.inc();
return value;
  }
}

我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢?

谢谢,
王磊



wangl...@geekplus.com.cn



Re: flink基于yarn的HA次数无效,以及HA拉起的任务是否可以重用state

2020-07-02 文章 Paul Lam
判断 Attempt 失败的标准是 Flink 通过 AMRMClientAsyncImpl 通知 YARN RM Application 
失败并注销自己,所以 kill jm 是不算的。

Best,
Paul Lam

> 2020年7月2日 11:09,liangji  写道:
> 
> 我之前配置了HA,也配置了flink中yarn-attempts=2,结果是kill jm进程可以无限重启
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 关于flink sql问题

2020-07-02 文章 forideal
Hi 本超



  关于Mysql 做维表,关掉cache后的优化手段,有什么建议吗?



比如,20k records per second 的流量,关掉 cache 会对 mysql 产生很大的压力。不知道 MySQL Lookup 做成 
async + batch 会不会提升性能或者有副作用。



Best forideal.







在 Benchao Li ,2020年7月1日 13:22写道:


我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。
如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证每次都是获取Mysql中最新的数据就可以了吧?

当然了,在DDL的时候并没有区分这个表是维表还是sink表,具体它是什么类型,只是根据你在SQL里面怎么使用来决定的。
理论上来讲,你一个DDL可以同时做维表也可以做Sink。(只是它们可能有些配置会不同,分开写两个DDL应该是更清晰一些)

zya  于2020年6月30日周二 下午11:26写道:

> 请问下,sink写出的表能做维表吗,因为sink会一直写入,做维表的话会一直动态变化
>
>
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年6月30日(星期二) 晚上11:14
> 收件人:"user-zh"
> 主题:Re: 关于flink sql问题
>
>
>
> 应该做一个维表Join就可以了。
>
>
> zya 
>  Hi 各位,有个问题想请教一下:
>  nbsp; nbsp; 目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql,
> 
> 
> nbsp;nbsp;nbsp;nbsp;在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗?
>
>
>
> --
>
> Best,
> Benchao Li



--

Best,
Benchao Li


Re: flink任务提交方式

2020-07-02 文章 Dream-底限
好的,感谢

On Thu, Jul 2, 2020 at 12:37 PM jianxu  wrote:

> 你可以看下这个项目https://github.com/todd5167/clusters-submiter,改造下应该满足你的需求。
> 在 2020-07-02 12:09:05,"Dream-底限"  写道:
> >hi
>
> >请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具
>


Re: flink的state过期设置

2020-07-02 文章 Yun Tang
Hi

TTL的时间戳实际是会存储在 state 里面 [1],与每个entry在一起,也就是说从Checkpoint恢复的话,数据里面的时间戳是当时插入时候的时间戳。

[1] 
https://github.com/apache/flink/blob/ba92b3b8b02e099c8aab4b2b23a37dca4558cabd/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L50

祝好
唐云


From: hdxg1101300...@163.com 
Sent: Thursday, July 2, 2020 11:17
To: user-zh 
Subject: flink的state过期设置

您好:
想咨询一下关于state的ttl问题;
想问一下 state设置的ttl,如果从checkpoints重启 ttl会不会失效;ttl针对的是process time,
比如我设置的7天过期,重新从checkpoints启动是第一次启动的时间算还是恢复时的新processtime算;他是state的一部分 还是怎么算;
   或者要注册定时器来实现



hdxg1101300...@163.com