回复: 回复:MapState 无法更新问题

2021-03-09 Thread 明启 孙
你虽然调整了实现方法的顺序,但是这个程序执行顺序还是先执行processElement(),后执行processBroadcastElement()


发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}

override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}

override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {

val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue


for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) 
tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
c



回复: 回复:MapState 无法更新问题

2021-03-09 Thread 明启 孙
A read-only view of the {@link BroadcastState}.
*
* Although read-only, the user code should not modify the value returned by 
the {@link
* #get(Object)} or the entries of the immutable iterator returned by the {@link
* #immutableEntries()}, as this can lead to inconsistent states. The reason for 
this is that we do
* not create extra copies of the elements for performance reasons.
*
* @param  The key type of the elements in the {@link ReadOnlyBroadcastState}.
* @param  The value type of the elements in the {@link 
ReadOnlyBroadcastState}.
*/
这是源码中对ReadOnlyBroadcastState的描述,希望对你有帮助


smq

发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}

override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}

override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {

val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue


for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) 
tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
c



Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 Thread Leonard Xu
你好,
你的flink版本是多少?
之前有个bug是Table转datastream 会丢rowtime问题,看起来是这个问题。

我在[1]里修复了,你可以升级对应的版本试下。


祝好,
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-21013 
 



> 在 2021年3月10日,14:34,HunterXHunter <1356469...@qq.com> 写道:
> 
> 再试了一下:
> 修改并行度也不行
>.setParallelism(9)
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 Thread HunterXHunter
再试了一下:
修改并行度也不行
.setParallelism(9)




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
希望有大佬给下这些参数的区别。如果环境的网络不好,纠结要调整哪个参数?还是哪些参数。 我目前只提高了 ask.timeout 。
目前看配置,太多与timeout相关的参数了。

akka.ask.timeout
akka.lookup.timeout
akka.retry-gate-closed-for
akka.tcp.timeout
akka.startup-timeout

heartbeat.interval
heartbeat.timeout

high-availability.zookeeper.client.connection-timeout
high-availability.zookeeper.client.session-timeout


taskmanager.network.request-backoff.max

...

yidan zhao  于2021年3月10日周三 下午1:13写道:

> 今天对比了下G1和copy-MarkSweepCompact的效果。
> 运行相同时间, 相同任务。 G1的GC时长更长,但是次数更多,因为每次GC的时间更短。
> 1h15min时间,G1的gc 1100+次,平均每次1s左右。 后者gc 205次,平均每次1.9s左右。
>
> yidan zhao  于2021年3月9日周二 下午7:30写道:
>
>> 补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。
>>
>> yidan zhao  于2021年3月9日周二 下午7:26写道:
>>
>>> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
>>> 然后目前通过Flink的web-ui看了下gc情况。
>>> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>>>
>>> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
>>> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>>>我目前5个TM的集群,单TM100G内存,跑任务大概10w
>>> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>>>
>>>
>>> Michael Ran  于2021年3月9日周二 下午4:27写道:
>>>
 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
 在 2021-03-09 14:57:43,"yidan zhao"  写道:
 >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
 >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
 >
 >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
 >
 >yidan zhao  于2021年3月9日周二 下午2:56写道:
 >
 >> 好的,我会看下。
 >> 然后我今天发现我好多个集群GC collector不一样。
 >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
 >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
 >> threads,还有一种是Mark Sweep Compact GC。
 >> 大佬们,Flink是根据内存大小有什么动态调整吗。
 >>
 >>
 >>
 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
 >>
 >>
 >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
 >>
 >>> Hi,
 >>>
 >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
 >>>
 >>> Best,
 >>> jjiey
 >>>
 >>> > 2021年3月8日 14:37,yidan zhao  写道:
 >>> >
 >>> >
 >>>
 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
 >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e
 lost
 >>> > leadership’ 错导致任务重启。
 >>> >
 >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
 >>> > 2021-03-08 14:31:40
 >>> > org.apache.flink.runtime.io
 >>> .network.netty.exception.RemoteTransportException:
 >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
 >>> >at org.apache.flink.runtime.io.network.netty.
 >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
 >>> > CreditBasedPartitionRequestClientHandler.java:294)
 >>> >at org.apache.flink.runtime.io.network.netty.
 >>> > CreditBasedPartitionRequestClientHandler.channelRead(
 >>> > CreditBasedPartitionRequestClientHandler.java:183)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:379)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:365)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 >>>
 AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
 >>> > .java:357)
 >>> >at org.apache.flink.runtime.io.network.netty.
 >>> > NettyMessageClientDecoderDelegate.channelRead(
 >>> > NettyMessageClientDecoderDelegate.java:115)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:379)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:365)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 >>>
 AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
 >>> > .java:357)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 >>>
 DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
 >>> > 1410)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:379)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> > AbstractChannelHandlerContext.invokeChannelRead(
 >>> > AbstractChannelHandlerContext.java:365)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
 >>> >
 DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
 >>> > 

Re: 如何自定义web-ui上对taskmanager的标识符

2021-03-09 Thread yidan zhao
感觉可以在subtask和taskmanager的2个tab中新加上显示 id
的功能。如何taskmanager支持自定义自己的id(一个用户自定义的随意不重复的id)。

yidan zhao  于2021年3月10日周三 下午2:27写道:

> 如题,目前有几个地方的需求。
> (1)taskmanagers页面的path、id是啥参数,是否仅展示,可随意自定义。
>
> (2)任务点到task后展开的右侧页面中。taskmanagers子tab中的host、以及subtasks子tab中的host是否可以自定义。每次希望看比如subtask-n这个是哪台机器,默认看到host不足以定位机器(我们是容器,因为无法根据host直接登陆),单个host上可能虚拟出多个容器;如果这个host不作为绑定地址,仅作为展示的话是否有地方可以配置。
>
>
>


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 Thread HunterXHunter
经过再一次验证:
即使我做group by rowtime的操作,
我对datastream做keyby(rowtime) 也有这个问题
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test "));
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
.keyby(_.f1)
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);

结果也是无法触发窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinkx????????????????

2021-03-09 Thread ??????
??
??2021??flinkx??
flnkx??jsonbegin1??end1begin2??reader??end2??writer??

{
"job": {
  "content": [
   {
"reader": {
 "parameter": {
  "username": "root",
  "password": "123456",
  "connection": [{
   "jdbcUrl": 
["jdbc:mysql://localhost:3306/flinkx1?useUnicode=truecharacterEncoding=utf8"],
   "table": ["begin1","begin2"]
  }],
  "column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "varchar"
}
  ],
  "customSql": "",
  "splitPk": "id",
  "queryTimeOut": 1000,
  "requestAccumulatorInterval": 2,
  "increColumn": "id",
  "startLocation": ""
 },
 "name": "mysqlreader"
},
"writer": {
 "name": "mysqlwriter",
 "parameter": {
  "username": "root",
  "password": "123456",
  "connection": [
   {
"jdbcUrl": 
"jdbc:mysql://localhost:3306/flinkx1?useUnicode=truecharacterEncoding=utf8",
"table": ["end1","end2"]
   }
  ],
  "writeMode": "insert",
  "column": [

 {
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "varchar"
}
  ],
  "batchSize": 1024
 }
}
   }
  ],
  "setting": {
   "speed": {
"channel": 2,
"bytes": 1000
   },
   "errorLimit": {
"record": 0
   },
   "restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
   },
   "log" : {
"isLogger": false,
"level" : "debug",
"path" : " /data/flinkx/flinkxconf/log/",
"pattern":""
   }
  }
 }
}

flinkx????????

2021-03-09 Thread ??????
??
??2021??flinkx??
flnkx??jsonbegin1??end1begin2??reader??end2??writer??

{
"job": {
  "content": [
   {
"reader": {
 "parameter": {
  "username": "root",
  "password": "123456",
  "connection": [{
   "jdbcUrl": 
["jdbc:mysql://localhost:3306/flinkx1?useUnicode=truecharacterEncoding=utf8"],
   "table": ["begin1","begin2"]
  }],
  "column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "varchar"
}
  ],
  "customSql": "",
  "splitPk": "id",
  "queryTimeOut": 1000,
  "requestAccumulatorInterval": 2,
  "increColumn": "id",
  "startLocation": ""
 },
 "name": "mysqlreader"
},
"writer": {
 "name": "mysqlwriter",
 "parameter": {
  "username": "root",
  "password": "123456",
  "connection": [
   {
"jdbcUrl": 
"jdbc:mysql://localhost:3306/flinkx1?useUnicode=truecharacterEncoding=utf8",
"table": ["end1","end2"]
   }
  ],
  "writeMode": "insert",
  "column": [

 {
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "varchar"
}
  ],
  "batchSize": 1024
 }
}
   }
  ],
  "setting": {
   "speed": {
"channel": 2,
"bytes": 1000
   },
   "errorLimit": {
"record": 0
   },
   "restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
   },
   "log" : {
"isLogger": false,
"level" : "debug",
"path" : " /data/flinkx/flinkxconf/log/",
"pattern":""
   }
  }
 }
}

Re: 回复:MapState 无法更新问题

2021-03-09 Thread chaos
你好,

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}

override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}

override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {

val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue


for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) 
tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


如何自定义web-ui上对taskmanager的标识符

2021-03-09 Thread yidan zhao
如题,目前有几个地方的需求。
(1)taskmanagers页面的path、id是啥参数,是否仅展示,可随意自定义。
(2)任务点到task后展开的右侧页面中。taskmanagers子tab中的host、以及subtasks子tab中的host是否可以自定义。每次希望看比如subtask-n这个是哪台机器,默认看到host不足以定位机器(我们是容器,因为无法根据host直接登陆),单个host上可能虚拟出多个容器;如果这个host不作为绑定地址,仅作为展示的话是否有地方可以配置。


回复:MapState 无法更新问题

2021-03-09 Thread smq
可以贴个完整的代码吗





-- 原始邮件 --
发件人: chaos http://apache-flink.147419.n8.nabble.com/

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
今天对比了下G1和copy-MarkSweepCompact的效果。
运行相同时间, 相同任务。 G1的GC时长更长,但是次数更多,因为每次GC的时间更短。
1h15min时间,G1的gc 1100+次,平均每次1s左右。 后者gc 205次,平均每次1.9s左右。

yidan zhao  于2021年3月9日周二 下午7:30写道:

> 补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。
>
> yidan zhao  于2021年3月9日周二 下午7:26写道:
>
>> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
>> 然后目前通过Flink的web-ui看了下gc情况。
>> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>>
>> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
>> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>>我目前5个TM的集群,单TM100G内存,跑任务大概10w
>> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>>
>>
>> Michael Ran  于2021年3月9日周二 下午4:27写道:
>>
>>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>>> >
>>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>>> >
>>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>>> >
>>> >> 好的,我会看下。
>>> >> 然后我今天发现我好多个集群GC collector不一样。
>>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>>> >> threads,还有一种是Mark Sweep Compact GC。
>>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>> >>
>>> >>
>>> >>
>>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>> >>
>>> >>
>>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>> >>
>>> >>> Hi,
>>> >>>
>>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>> >>>
>>> >>> Best,
>>> >>> jjiey
>>> >>>
>>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>>> >>> >
>>> >>> >
>>> >>>
>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>>> >>> > leadership’ 错导致任务重启。
>>> >>> >
>>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>> >>> > 2021-03-08 14:31:40
>>> >>> > org.apache.flink.runtime.io
>>> >>> .network.netty.exception.RemoteTransportException:
>>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> >>> > .java:357)
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > NettyMessageClientDecoderDelegate.channelRead(
>>> >>> > NettyMessageClientDecoderDelegate.java:115)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> >>> > .java:357)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>>> >>> > 1410)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>>> >>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>>> >>> > AbstractEpollStreamChannel.java:792)
>>> >>> >at
>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> >>> > .processReady(EpollEventLoop.java:475)
>>> >>> >at
>>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> >>> > .run(EpollEventLoop.java:378)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> >>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> >>> >at 

MapState 无法更新问题

2021-03-09 Thread chaos
你好,我在使用广播流的时候定义了一个MapState,并在逻辑处理中往其中放数据,但是我始终没法成功更新其值,忘解惑。 

定义:
private val carEfenceState: MapState[String, Boolean] = new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean])

存值:
carEfenceState.put(mapKey, true)

取值:
carEfenceState.get(mapKey)

取到的值始终为 false.

Thanks in advance!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-03-09 Thread Alexey Trenikhun
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey

From: Yang Wang 
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have 
been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering 
from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application 
cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang

Alexey Trenikhun mailto:yen...@msn.com>> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is 
deployed as Job, single TM as StatefulSet). We taken savepoint with 
cancel=true. Now when we are trying to start job using --fromSavepoint A, where 
is A path we got from taking savepoint (ClusterEntrypoint reports A in log), 
but looks like Job for some reason ignores given A and actually trying to 
restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program 
Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
/opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-e8a201008f2c","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.501Z","message":"
--job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:52.502Z","message":"
","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already 
downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during 
restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 
 from savepoint 
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-fbcd58f66685
 (allowing non restored 
state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}

Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 Thread 9722
有啊,现在很多公司都做了数据平台,并且对外出售,你可以找一个公司的产品试用,然后照着它的功能,自己开发

> 2021年3月6日 16:00,Jacob <17691150...@163.com> 写道:
> 
> 我们现在提交Flink Job 是通过flink客户端run命令提交job,进行实时任务的计算,每次提交都要登录prd机器,上传jar包,过程比较麻烦。 
> 
> 
> 后期规划把实时计算的任务集成到我们已有的一个系统中,把上面描述的过程封装起来,给用户提供一些按钮、菜单等,理想状态下,在这个系统增加一些模块、菜单之类的东西,就能完成对Job的维护,包括提交Job、查看正在运行的Job、停止Job等等
>   
> 
> 上面所说的这个系统是我们自研的一个数据处理平台,实时计算任务也是其中的一环,因此就想把实时计算的任务的模块也集成到其中去。
> 
> 
> 不知道这有没有可能实现
> 
> 请大佬提供些许思路!感谢
> 
> 
> 
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 Thread silence
个人也维护了个flink平台的开源项目,希望可以帮助到你
https://github.com/hairless/plink



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 Thread HunterXHunter
https://github.com/zhp8341/flink-streaming-platform-web
这个你可以参考下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink 提交到本地集群报错

2021-03-09 Thread Shuiqiang Chen
Huilin 你好,

你用的是哪个版本的Flink呢?

Huilin_WU <592900...@qq.com> 于2021年3月10日周三 上午9:39写道:

> 我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py
> xx.py就会报上面的错误说没有pyflink的组件。
> (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m
> localhost:8081 -py demo_predict.py
> Traceback (most recent call last):
>   File "demo_predict.py", line 51, in 
> from pyflink.common.serialization import SimpleStringEncoder
> ModuleNotFoundError: No module named 'pyflink.common.serialization'
>
> 我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: spark 3.1.1 support hive 1.2

2021-03-09 Thread jiahong li
omit it !!!

jiahong li  于2021年3月10日周三 上午10:27写道:

> hi,sorry to bother you.In spark 3.0.1,hive-1.2 is supported,but in spark
> 3.1.x maven profile hive-1.1 is removed.Is that means hive-1.2 does not
> supported  in spark 3.1.x? how can i support hive-1.2 in spark 3.1.x,or any
> jira? can anyone help me ?
>


Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-09 Thread Fuyao Li
Hi Flink Community,

After configuring the JDBC timeout time, I still could not get rid of the issue.
https://issues.apache.org/jira/browse/FLINK-21674
I created a JIRA task to describe the problem. Any suggestion is appreciated.

Best regards,
Fuyao

From: Fuyao Li 
Date: Wednesday, March 3, 2021 at 15:14
To: XU Qinghui 
Cc: user , Timo Walther 
Subject: Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after 
some idle time
Hi Qinghui,

I agree. I am trying to found internal and resources on the internet to fix the 
issue.  Idle Time 
Limits
 might be a reason. But after configuring those parameters and updated the 
sqlnet.ora to
WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="… ")))
SSL_SERVER_DN_MATCH=yes
NAMES.DIRECTORY_PATH=(ezconnect,tnsnames)
SQLNET.USE_HTTPS_PROXY=on
DISABLE_OOB=on
SQLNET.RECV_TIMEOUT = 7200
BEQUEATH_DETACH = YES
SQLNET.EXPIRE_TIME = 1
SQLNET.SEND_TIMEOUT = 7200
SQLNET.INBOUND_CONNECT_TIMEOUT = 7200

SQLNET.EXPIRE_TIME is kind of like heartbeat thing to keep the connection alive.

It still doesn’t work after all of these configurations. Pretty weird…

I will post a follow up if I could find the answer… Thanks.

BR,
Fuyao


From: XU Qinghui 
Date: Tuesday, March 2, 2021 at 13:40
To: Fuyao Li 
Cc: user , Timo Walther 
Subject: [External] : Re: Need help with JDBC Broken Pipeline Issue after some 
idle time
It sounds like the jdbc driver's connection is closed somehow, and probably has 
nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the 
connection after some inactivity, or otherwise it could be your network drops 
the inactive tcp connection after some time (you can try to use tcp keepalive 
in this case).

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li 
mailto:fuyao...@oracle.com>> a écrit :
Sorry for the uncompleted email.

Error log of broken pipeline, the failed SQL will be executed after checkpoint 
automatic recovery. Please share some ideas on this issue. Really appreciate 
it. Thanks!

09:20:02,868 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,869 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 

spark 3.1.1 support hive 1.2

2021-03-09 Thread jiahong li
hi,sorry to bother you.In spark 3.0.1,hive-1.2 is supported,but in spark
3.1.x maven profile hive-1.1 is removed.Is that means hive-1.2 does not
supported  in spark 3.1.x? how can i support hive-1.2 in spark 3.1.x,or any
jira? can anyone help me ?


Re: Is Ververica Connector Redis open source?

2021-03-09 Thread Yik San Chan
Hi Yun Tang,

Thanks for the information.

Best,
Yik San Chan

On Wed, Mar 10, 2021 at 1:07 AM Yun Tang  wrote:

> Hi Yik,
>
> As far as I know, the source code of ververica connector is not public,
> and you could refer to [1] for open-source implementation.
>
>
> [1]
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>
> Best
> Yun Tang
>
>
>
> --
> *From:* Yik San Chan 
> *Sent:* Tuesday, March 9, 2021 12:01
> *To:* user 
> *Subject:* Is Ververica Connector Redis open source?
>
> Hi community,
>
> I found this package
> https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-redis/1.11-vvr-2.1.3
> in Maven Repository. However, I cannot find it anywhere in GitHub. Does
> anyone know this is open source or not?
>
> Thank you!
>
> Best,
> Yik San Chan
>


Pyflink 提交到本地集群报错

2021-03-09 Thread Huilin_WU
我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py
xx.py就会报上面的错误说没有pyflink的组件。
(base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m
localhost:8081 -py demo_predict.py
Traceback (most recent call last):
  File "demo_predict.py", line 51, in 
from pyflink.common.serialization import SimpleStringEncoder
ModuleNotFoundError: No module named 'pyflink.common.serialization'

我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Extracting state keys for a very large RocksDB savepoint

2021-03-09 Thread Andrey Bulgakov
Hi all,

I'm trying to use the State Processor API to extract all keys from a
RocksDB savepoint produced by an operator in a Flink streaming job into CSV
files.

The problem is that the storage size of the savepoint is 30TB and I'm
running into garbage collection issues no matter how much memory in
different proportions or CPU cores I allocate to task managers. (I tried
allocating up to 120GB and 16 cores to each task).

The same program and hardware configuration works with no problems for a
smaller savepoint (300GB), it's some sort of a scalability issue here.

At the beginning the tasks spend a couple hours in what I call "the
download phase". During that phase heap usage as indicated by metrics and
Flink UI is at about 10% and everything is going great.

But at certain point heap usage for tasks coming out of the download phase
starts to go up, climbs up to about 87% usage as indicated in Flink UI and
by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap
usage metric doesn't increase anymore and JVM starts spending a lot of time
collecting garbage and keeping all CPUs 100% loaded. After some time in
this mode the job crashes with "java.util.concurrent.TimeoutException:
Heartbeat of TaskManager with id container_1614821414188_0002_01_35
timed out."

At all times the indicated managed memory usage is 0%. Which seems
suspicious since RocksDB is supposed to be using it?

Also, judging by the lack of an application metric I have in the state
processor operator, KeyedStateReaderFunction.readKey never gets called.

I would appreciate if somebody helped answering some of my questions or
suggested a way I could further diagnose/fix this:

1. Is it normal that this overwhelming garbage collection starts long
before reaching 100% heap usage? At the time it happens there 's usually
10-15GB of heap showing up as available.

2. Am I correct to assume that even in batch mode Flink implements memory
back pressure and is supposed to slow down processing/allocations when it's
low on available heap memory?

3. If #2 is true, is it possible that due to some misconfiguration Flink
considers more heap space to be available than there actually is and keeps
allocating even though there's no more heap?

4. As an alternative to #3, is it possible that there are some unaccounted
heap allocations that are not shown in the UI and by the metric and
therefore not taken into account by the memory back pressure mechanism?

Here's the minimal code example that demonstrates the issue:
https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f

I'm running this on Flink 12.2 (and many earlier versions, too) with the
following base configuration and parallelism of 80 (tried lowering that to
have more resources available, too):
https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025

I tried many things with no success:
- reducing parallelism and making more resources available to each task
manager
- enabling object reuse and modifying the tuple mapper to avoid extra tuple
allocations
- manipulating memory ratios to allocate more memory to be used as heap,
managed
- allocating 20% of memory for JVM overhead
- switching to G1GC garbage collector

Again, would appreciate any help with this.

-- 
With regards,
Andrey Bulgakov


Re: Re: How to check checkpointing mode

2021-03-09 Thread Alexey Trenikhun
Hi Yun,
It is confusing but UI now shows expected value "At Least Once" (obviously 
checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've 
either looked in wrong place or job was not upgraded when I changed 
checkpointing mode ...

Sorry for noise and thank you for your help

Alexey


From: Yun Gao 
Sent: Monday, March 8, 2021 7:14 PM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: Re: How to check checkpointing mode

Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of
see#checkpointCfg#checkpointingMode ?

Best,
Yun

--Original Mail --
Sender:Alexey Trenikhun 
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List , Yun Gao 

Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods 
dealing with command line parameters etc, below two major methods:


@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job 
args, since we add
// -- after jobs argument, this unnecessary for us arguments will be 
treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
  } catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
  } catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
  }
}


private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws 
IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
  .getExecutionEnvironment()
  .enableCheckpointing(checkpointInterval.toMillis(),
  CheckpointingMode.AT_LEAST_ONCE)
  .setMaxParallelism(1024)
  .setParallelism(parallelism);
  if (externalizedCheckpoints) {
see.getCheckpointConfig()

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
  .putAll(kafkaCommonOptions)
  .putAll(kafkaProducerOptions)
  .varFiles(valueFiles)
  .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
  .semantic(Semantic.AT_LEAST_ONCE)
  .config(producerProperties)
  .build();

  final AutoTopic autoTopic = AutoTopic.builder()
  .config(producerProperties)
  .partitions(autoCreateTopicsPartitions)
  .replicationFactor(autoCreateTopicsReplicationFactor)
  .doNotCreateTopics(ImmutableSet.of(
  gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
  ))
  .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 
Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole 
TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in 
event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
  new TStateCleanupOnTimeout.Factory(
  maxCallDuration,
  postmortemCallDuration,
  globalAppConfig.timerGranularity()
  );

  @Nullable final SingleOutputStreamOperator cfgXform;
  @Nullable final DataStream cfgSource = addSources(see,
  SourceTopic.GCA_CFG,
  new CfgJsonDeserializationSchema(),
  (event, timestamp) -> event.getBatchId(),
  it -> !it.getHeartbeat());

  if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");

if (!isNullOrEmpty(gspCfg)) {
  cfgXform.addSink(producerFactory.create(gspCfg,
  autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg
  .uid("uid-" + gspCfg)
  .name(gspCfg);
} else {
  cfgXform.addSink(new DiscardingSink<>())
  .uid("uid-gsp-cfg-null")
  .name("gsp-cfg-null");
}
  } else {
cfgXform = null;
  }

  final DataStream voiceCallThreadSource = addSources(see,
  SourceTopic.VOICE_CALL_THREAD,
  callThreadFormat == KafkaTopicFormat.JSON
  ? new TJsonDeserializationSchema()
  : new CallEventDeserializationSchema(),
  (event, 

Re: Flink Read S3 Intellij IDEA Error

2021-03-09 Thread sri hari kali charan Tummala
my stack overflow question.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868

On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Here is my Intellij question.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>
> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>>
>> Hi Flink Experts,
>>>
>>
>> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>>> across Aws Auth error can someone help below are all the details.
>>>
>>
>>
>>> I have Aws credentials in homefolder/.aws/credentials
>>>
>>
>> My Intellij Environment Variables:-
>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>
>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>
>>> flink-conf.yaml file content:-
>>>
>>> fs.hdfs.hadoopconf: 
>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>
>>> core-site.xml file content:-
>>>
>>> 
>>> 
>>>
>>> 
>>> 
>>> fs.s3.impl
>>> org.apache.hadoop.fs.s3a.S3AFileSystem
>>> 
>>>
>>> 
>>> fs.s3.buffer.dir
>>> /tmp
>>> 
>>>
>>> 
>>> fs.s3a.server-side-encryption-algorithm
>>> AES256
>>> 
>>>
>>> 
>>>
>>> 
>>> fs.s3a.aws.credentials.provider
>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>>> 
>>> 
>>> fs.s3a.access.key
>>> 
>>> 
>>> 
>>> fs.s3a.secret.key
>>> 
>>> 
>>> 
>>> fs.s3a.session.token
>>> 
>>> 
>>>
>>> 
>>> fs.s3a.proxy.host
>>> 
>>> 
>>> 
>>> fs.s3a.proxy.port
>>> 8099
>>> 
>>> 
>>> fs.s3a.proxy.username
>>> 
>>> 
>>> 
>>> fs.s3a.proxy.password
>>> 
>>> 
>>>
>>> 
>>>
>>> POM.xml file:-
>>>
>>> 
>>> http://maven.apache.org/POM/4.0.0;
>>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>> 4.0.0
>>>
>>> FlinkStreamAndSql
>>> FlinkStreamAndSql
>>> 1.0-SNAPSHOT
>>> 
>>> src/main/scala
>>> 
>>> 
>>> 
>>> net.alchim31.maven
>>> scala-maven-plugin
>>> 3.1.3
>>> 
>>> 
>>> 
>>> compile
>>> testCompile
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> org.apache.maven.plugins
>>> maven-surefire-plugin
>>> 2.13
>>> 
>>> false
>>> true
>>> 
>>> 
>>> 
>>> **/*Test.*
>>> **/*Suite.*
>>> 
>>> 
>>> 
>>>
>>> 
>>> 
>>> maven-assembly-plugin
>>> 2.4.1
>>> 
>>> 
>>> jar-with-dependencies
>>> 
>>> 
>>> 
>>> 
>>> make-assembly
>>> package
>>> 
>>> single
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-core
>>> 1.8.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-core
>>> 1.8.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-clients_2.11
>>> 1.8.1
>>> 
>>>
>>> 
>>> org.apache.derby
>>> derby
>>> 10.13.1.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-jdbc_2.11
>>> 1.8.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-table-api-scala_2.11
>>> 1.8.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-table-api-java
>>> 1.8.1
>>> 
>>>
>>>
>>> 
>>> org.apache.flink
>>> flink-table
>>> 1.8.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-table-planner_2.11
>>> 1.8.1
>>> 
>>>
>>>
>>> 
>>> org.apache.flink
>>> 

Re: Flink Read S3 Intellij IDEA Error

2021-03-09 Thread sri hari kali charan Tummala
Here is my Intellij question.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868

On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

>
> Hi Flink Experts,
>>
>
> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>> across Aws Auth error can someone help below are all the details.
>>
>
>
>> I have Aws credentials in homefolder/.aws/credentials
>>
>
> My Intellij Environment Variables:-
>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>
>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>
>> flink-conf.yaml file content:-
>>
>> fs.hdfs.hadoopconf: 
>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>
>> core-site.xml file content:-
>>
>> 
>> 
>>
>> 
>> 
>> fs.s3.impl
>> org.apache.hadoop.fs.s3a.S3AFileSystem
>> 
>>
>> 
>> fs.s3.buffer.dir
>> /tmp
>> 
>>
>> 
>> fs.s3a.server-side-encryption-algorithm
>> AES256
>> 
>>
>> 
>>
>> 
>> fs.s3a.aws.credentials.provider
>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>> 
>> 
>> fs.s3a.access.key
>> 
>> 
>> 
>> fs.s3a.secret.key
>> 
>> 
>> 
>> fs.s3a.session.token
>> 
>> 
>>
>> 
>> fs.s3a.proxy.host
>> 
>> 
>> 
>> fs.s3a.proxy.port
>> 8099
>> 
>> 
>> fs.s3a.proxy.username
>> 
>> 
>> 
>> fs.s3a.proxy.password
>> 
>> 
>>
>> 
>>
>> POM.xml file:-
>>
>> 
>> http://maven.apache.org/POM/4.0.0;
>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>> 4.0.0
>>
>> FlinkStreamAndSql
>> FlinkStreamAndSql
>> 1.0-SNAPSHOT
>> 
>> src/main/scala
>> 
>> 
>> 
>> net.alchim31.maven
>> scala-maven-plugin
>> 3.1.3
>> 
>> 
>> 
>> compile
>> testCompile
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> org.apache.maven.plugins
>> maven-surefire-plugin
>> 2.13
>> 
>> false
>> true
>> 
>> 
>> 
>> **/*Test.*
>> **/*Suite.*
>> 
>> 
>> 
>>
>> 
>> 
>> maven-assembly-plugin
>> 2.4.1
>> 
>> 
>> jar-with-dependencies
>> 
>> 
>> 
>> 
>> make-assembly
>> package
>> 
>> single
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>
>> 
>> org.apache.flink
>> flink-core
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-core
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-clients_2.11
>> 1.8.1
>> 
>>
>> 
>> org.apache.derby
>> derby
>> 10.13.1.1
>> 
>>
>> 
>> org.apache.flink
>> flink-jdbc_2.11
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-table-api-scala_2.11
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-table-api-java
>> 1.8.1
>> 
>>
>>
>> 
>> org.apache.flink
>> flink-table
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-table-planner_2.11
>> 1.8.1
>> 
>>
>>
>> 
>> org.apache.flink
>> flink-json
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-scala_2.11
>> 1.8.1
>> 
>>
>>
>>org.apache.flink
>>flink-scala_2.11
>>1.8.1
>>
>>
>>
>>org.apache.flink
>>flink-streaming-scala_2.11
>>1.8.1
>>
>>
>>
>>org.apache.flink
>>

Flink and Nomad ( from Hashicorp)

2021-03-09 Thread Vishal Santoshi
Is there any reason not to have Nomad HA  on the lines of K8s HA ? I think
it would depend on how puggable the HA core code is ? Any links to how
ZK/K8s code specifically for HA would be highly appreciated


Re: Missing support for `TestStreamEnvironment#executeAsync`

2021-03-09 Thread Bob Tiernay
Great, thank you so much!

On Tue, Mar 9, 2021 at 1:08 PM Till Rohrmann  wrote:

> *This message originated outside your organization.*
>
> --
>
> Hi Bob,
>
> Thanks for reporting this issue. I believe that this has been an
> oversight. I have filed a JIRA issue for fixing this problem [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-21693
> 
>
> Cheers,
> Till
>
> On Mon, Mar 8, 2021 at 4:15 PM Bob Tiernay  wrote:
>
>> Hi all,
>>
>> I have been trying to test a Flink 1.11 streaming job using the
>> `DataStreamUtils#collect` utility against a `MiniCluster` based test.
>> However, I noticed an issue when doing so.
>>
>> `TestStreamEnvironment` does not implement `executeAsync`. Thus
>> when `DataStreamUtils#collect` is called, it invokes
>> `env.executeAsync("Data Stream Collect");` which will instead use
>> `StreamExecutionEnvironment#executeAsync`'s implementation. This is
>> problematic since it will create a brand new `MiniCluster` when the
>> following lines are hit:
>>
>> CompletableFuture jobClientFuture = executorFactory
>>.getExecutor(configuration)
>>.execute(streamGraph, configuration);
>>
>>
>> Any configurations that were applied during the test won't be respected.
>> Is this expected behavior?
>>
>> Thanks in advance,
>>
>> Bob
>>
>


Re: Missing support for `TestStreamEnvironment#executeAsync`

2021-03-09 Thread Till Rohrmann
Hi Bob,

Thanks for reporting this issue. I believe that this has been an oversight.
I have filed a JIRA issue for fixing this problem [1].

[1] https://issues.apache.org/jira/browse/FLINK-21693

Cheers,
Till

On Mon, Mar 8, 2021 at 4:15 PM Bob Tiernay  wrote:

> Hi all,
>
> I have been trying to test a Flink 1.11 streaming job using the
> `DataStreamUtils#collect` utility against a `MiniCluster` based test.
> However, I noticed an issue when doing so.
>
> `TestStreamEnvironment` does not implement `executeAsync`. Thus
> when `DataStreamUtils#collect` is called, it invokes
> `env.executeAsync("Data Stream Collect");` which will instead use
> `StreamExecutionEnvironment#executeAsync`'s implementation. This is
> problematic since it will create a brand new `MiniCluster` when the
> following lines are hit:
>
> CompletableFuture jobClientFuture = executorFactory
>.getExecutor(configuration)
>.execute(streamGraph, configuration);
>
>
> Any configurations that were applied during the test won't be respected.
> Is this expected behavior?
>
> Thanks in advance,
>
> Bob
>


Best practices for complex state manipulation

2021-03-09 Thread Dan Hill
Hi!

I'm working on a join setup that does fuzzy matching in case the client
does not send enough parameters to join by a foreign key.  There's a few
ways I can store the state.  I'm curious about best practices around this.
I'm using rocksdb as the state storage.

I was reading the code for IntervalJoin

and was a little shocked by the implementation.  It feels designed for very
short join intervals.

I read this set of pages

but I'm looking for one level deeper.  E.g. what are performance
characteristics of different types of state crud operations with rocksdb?
E.g. I could create extra MapState to act as an index.  When is this worth
it?


Re: failure checkpoint counts

2021-03-09 Thread Yun Tang
Hi Abdullah,

The "Connection refused" exception should have no direct relationship with 
checkpoint, I think you could check whether the socket source has worked well 
in your job.

Best
Yun Tang

From: Abdullah bin Omar 
Sent: Tuesday, March 9, 2021 0:13
To: user@flink.apache.org 
Subject: failure checkpoint counts

Hi,

I faced this exception at the time of checkpoint counts. Could you please 
inform me what the problem is here?

the exception:


org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=100)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:130)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:81)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:221)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:696)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:433)

at jdk.internal.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)

at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:564)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.net.ConnectException: Connection refused

at java.base/sun.nio.ch.Net.connect0(Native Method)

at java.base/sun.nio.ch.Net.connect(Net.java:574)

at java.base/sun.nio.ch.Net.connect(Net.java:563)

at 
java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588)

at 
java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:333)

at java.base/java.net.Socket.connect(Socket.java:648)

at 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:104)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)



Thank you!





Re: Is Ververica Connector Redis open source?

2021-03-09 Thread Yun Tang
Hi Yik,

As far as I know, the source code of ververica connector is not public, and you 
could refer to [1] for open-source implementation.


[1] https://github.com/apache/bahir-flink/tree/master/flink-connector-redis

Best
Yun Tang




From: Yik San Chan 
Sent: Tuesday, March 9, 2021 12:01
To: user 
Subject: Is Ververica Connector Redis open source?

Hi community,

I found this package 
https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-redis/1.11-vvr-2.1.3
 in Maven Repository. However, I cannot find it anywhere in GitHub. Does anyone 
know this is open source or not?

Thank you!

Best,
Yik San Chan


Re: Future of QueryableState

2021-03-09 Thread Konstantin Knauf
Hi Maciek,

Thank you for reaching out. I'll try to answer your questions separately.

- nothing comparable. You already mention the State Processor API. Besides
that, I can only think of a side channel (CoFunction) that is used to
request a certain state that is then send to a side output and ultimate to
a sink, e.g. Kafka State Request Topic -> Flink -> Kafka State Response
Topic. This puts this complexity into the Flink Job, though.

- I think it is a combination of both. Queryable State works well within
its limitations. In the case of the RocksDBStatebackend this is mainly the
availability of the job and the fact that you might read "uncommitted"
state updates. In case of the heap-backed statebackends there are also
synchronization issues, e.g. you might read stale values. You also mention
the fact that queryable state has been an afterthought when it comes to
more recent deployment options. I am not aware of any Committer who
currently has the time to work on this to the degree that would be
required. So, we thought, it would be more fair and realistic to mark
Queryable State as "approaching end of life" in the sense that there is no
active development on that component anymore.

Best,

Konstantin

On Tue, Mar 9, 2021 at 7:08 AM Maciek Próchniak  wrote:

> Hello,
>
>
> We are using QueryableState in some of Nussknacker deployments as a nice
> addition, allowing end users to peek inside job state for a given key
> (we mostly use custom operators).
>
>
> Judging by mailing list and feature radar proposition by Stephan:
>
> https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg
>
>
> this feature is not widely used/supported. I'd like to ask:
>
> - are there any alternative ways of accessing state during job
> execution? State API is very nice, but it operates on checkpoints and
> loading whole state to lookup one key seems a bit heavy?
>
> - are there any inherent problems in QueryableState design (e.g. it's
> not feasible to use it in K8 settings, performance considerations) or
> just lack of interest/support (in that case we may offer some help)?
>
>
> thanks,
>
> maciek
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Stateful functions 2.2 and stop with savepoint

2021-03-09 Thread Igal Shilman
Hi Dylan,

Unfortunately stop with savepoint is not supported with StateFun.
We will bump the priority of this issue and try to address it in the next
bugfix release.

Thanks,
Igal.

On Mon, Mar 8, 2021 at 9:08 PM Meissner, Dylan <
dylan.t.meiss...@nordstrom.com> wrote:

> Thank you for this information, Piotr.
>
> The comment from Igal Shilman in FLINK-18894 issue says, "Obtaining a
> MAX_PRIO mailbox from StreamTask, solves this issue." I'm unclear what this
> means -- is this a workaround I can leverage?
>
> Dylan
> --
> *From:* Piotr Nowojski 
> *Sent:* Thursday, March 4, 2021 7:03 AM
> *To:* Kezhu Wang 
> *Cc:* Meissner, Dylan ;
> user@flink.apache.org 
> *Subject:* Re: Stateful functions 2.2 and stop with savepoint
>
> It doesn't change much ;) There is a known issue of stopping with
> savepoint and stateful functions not working [1]. The difference is that
> this one we will probably want to tackle sooner or later. Old streaming
> iterations are probably dead..
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18894
>
> czw., 4 mar 2021 o 15:56 Kezhu Wang  napisał(a):
>
> Hi all,
>
> My BAD!!!
>
> Sorry for apparent mess up in that moment.
>
> I will write a separate test for stream iterations.
>
>
> The stateful function part should be a separated issue.
>
>
> Best,
> Kezhu Wang
>
>
> On March 4, 2021 at 22:13:48, Piotr Nowojski (piotr.nowoj...@gmail.com)
> wrote:
>
> Hi Meissner,
>
> Can you clarify, are you talking about stateful functions? [1] Or the
> stream iterations [2]? The first e-mail suggests stateful functions, but
> the ticket that Kezhu created is talking about the latter.
>
> Piotrek
>
> [1] https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#iterations
>
>
>
> niedz., 28 lut 2021 o 15:33 Kezhu Wang  napisał(a):
>
> Hi,
>
> You could also try `cancel —withSavepoint [savepointDir]` even it is in
> deprecation. Comparing to take-savepoints and then cancel approach, there
> will be no checkpoints in between. This may be important if there are two
> phase commit operators in your job.
>
>
> Best,
> Kezhu Wang
>
>
> On February 28, 2021 at 20:50:29, Meissner, Dylan (
> dylan.t.meiss...@nordstrom.com) wrote:
>
> Thank you for opening the bug and including the extra context.
>
> I'll track the progress and, in the meantime, I will work around by taking
> two separate actions when stopping job: take-savepoints, then cancel.
> --
> *From:* Kezhu Wang 
> *Sent:* Sunday, February 28, 2021 12:31 AM
> *To:* user@flink.apache.org ; Meissner, Dylan <
> dylan.t.meiss...@nordstrom.com>
> *Subject:* Re: Stateful functions 2.2 and stop with savepoint
>
> Hi,
>
> Thanks for reporting. I think it is a Flink bug and have created
> FLINK-21522 for it. You could track progress there.
>
>
> FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522
>
>
> Best,
> Kezhu Wang
>
> On February 28, 2021 at 00:59:04, Meissner, Dylan (
> dylan.t.meiss...@nordstrom.com) wrote:
>
> I have an embedded function with a SinkFunction as an egress, implemented
> as this pseudo-code:
>
> val serializationSchema = KafkaSchemaSerializationSchema(... props
> required to use a Confluent Schema Registry with Avro, auth etc ...)
> return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
> props, AT_LEAST_ONCE))
>
> Checkpointing and taking a savepoint without stopping work as expected.
>
> However, when I run "flink stop " or even "flink stop --drain
> ", the operation never completes, reporting IN_PROGRESS until I hit
> the "failure-cause:
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
> before completing" CompletedException.
>
> In the "Checkpoint History" it shows only 2 of my 3 operators completed
> their work:
>
> Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1
> (100%) | end-to-end duration: 638ms | data-size 1.38 KB
> feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
> | end-to-end duration: n/a | data-size: n/a
> feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
> data-size: 0 B
>
> I've been unable to gain any insights from logs so far. Thoughts?
>
>


Re: KafkaSource Problem

2021-03-09 Thread Bobby Richard
Great thanks, I was able to work around the issue by implementing my own
KafkaRecordDeserializer. I will take a stab at a PR to fix the bug, should
be an easy fix.

On Tue, Mar 9, 2021 at 9:26 AM Till Rohrmann  wrote:

> Hi Bobby,
>
> This is most likely a bug in Flink. Thanks a lot for reporting the issue
> and analyzing it. I have created an issue for tracking it [1].
>
> cc Becket.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21691
>
> Cheers,
> Till
>
> On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard 
> wrote:
>
>> I'm receiving the following exception when trying to use a KafkaSource
>> from the new DataSource API.
>>
>> Exception in thread "main" java.lang.NullPointerException
>> at
>> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
>> at
>> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
>>
>> Here is my code (kotlin)
>>
>> val kafkaSource = buildKafkaSource(params)
>> val datastream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "kafka")
>>
>> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
>> val builder = KafkaSource.builder()
>> .setBootstrapServers(params.get("bootstrapServers"))
>> .setGroupId(params.get("groupId"))
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setTopics("topic")
>> 
>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>>
>> if (params.getBoolean("boundedSource", false)) {
>> builder.setBounded(OffsetsInitializer.latest())
>> }
>>
>> return builder.build()
>> }
>>
>>
>>
>>
>> I'm setting the deserializer using the ValueDeserializerWrapper as
>> described in the KafkaSourceBuilder javadoc example
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
>>
>> Looking at the code for the ValueDeserializerWrapper, it appears that the
>> deserializer isn't actually set until the deserialize method is called, but
>> getProducedType is actually called first resulting in the
>> NullPointerException. What am I missing?
>>
>> Thanks,
>> Bobby
>>
>> This electronic communication and the information and any files
>> transmitted with it, or attached to it, are confidential and are intended
>> solely for the use of the individual or entity to whom it is addressed and
>> may contain information that is confidential, legally privileged, protected
>> by privacy laws, or otherwise restricted from disclosure to anyone else. If
>> you are not the intended recipient or the person responsible for delivering
>> the e-mail to the intended recipient, you are hereby notified that any use,
>> copying, distributing, dissemination, forwarding, printing, or copying of
>> this e-mail is strictly prohibited. If you received this e-mail in error,
>> please return the e-mail to the sender, delete it from your computer, and
>> destroy any printed copy of it.
>
>

-- 

*Bobby Richard*
R Software Engineer   | Information Security Group   | Symantec
Enterprise Division
Broadcom

mobile: 337.794.2128

Atlanta, GA (USA)
bobby.rich...@broadcom.com   | broadcom.com

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.


smime.p7s
Description: S/MIME Cryptographic Signature


Re: KafkaSource Problem

2021-03-09 Thread Till Rohrmann
Hi Bobby,

This is most likely a bug in Flink. Thanks a lot for reporting the issue
and analyzing it. I have created an issue for tracking it [1].

cc Becket.

[1] https://issues.apache.org/jira/browse/FLINK-21691

Cheers,
Till

On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard 
wrote:

> I'm receiving the following exception when trying to use a KafkaSource
> from the new DataSource API.
>
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
>
> Here is my code (kotlin)
>
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
>
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
>
> return builder.build()
> }
>
>
>
>
> I'm setting the deserializer using the ValueDeserializerWrapper as
> described in the KafkaSourceBuilder javadoc example
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
>
> Looking at the code for the ValueDeserializerWrapper, it appears that the
> deserializer isn't actually set until the deserialize method is called, but
> getProducedType is actually called first resulting in the
> NullPointerException. What am I missing?
>
> Thanks,
> Bobby
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.


退订

2021-03-09 Thread Miro.Zheng
退订

Re: Trigger and completed Checkpointing do not appeared

2021-03-09 Thread Smile
Hi, 

After implementing SourceFunction, you can use it to create a DataStream
using env.addSource() in your main method.
For example, if you have your custom source class with the name CustomSource
that implements SourceFunction, then it can be used for getting
input data and the if-statement after it can be removed:

// get input data
DataStream text = env.addSource(new CustomSource());


ExampleCountSource in [1] implements SourceFunction, which can be used
to get a DataStream with type Long, not String, such as:

DataStream numbers = env.addSource(new ExampleCountSource());


If you only want to have a look at how checkpoint being triggered, see [2]
for another sample that has a custom endless source named TransactionSource.
When enabled checkpoint it can be triggered with your rules. It might be
easier for a beginner than implement it by yourself.
However, it may not restore from a checkpoint perfectly since it doesn't
implement CheckpointedFunction. That is to say, if you want your source to
be restored successfully after failures, CheckpointedFunction is also
necessary and ExampleCountSource in [1] is a good example.


[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html

Regards,
Smile




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink 1.12.2??????????????????????????????????????????????????

2021-03-09 Thread Asahi Lee
????flink 
1.12??flinkDataStream 
API,??RuntimeExecutionMode.BATCH??
 

package com.meritdata.cloud.tempo.dw.flink.test.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class JDBCTest {

public static void main(String[] args) {
test();
/**
 * ??
 * EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
 * .useBlinkPlanner().inBatchMode().build();
 * TableEnvironment bbTableEnv = 
TableEnvironment.create(bbSettings);
 * ++--+
 * |  a |   EXPR$1 |
 * ++--+
 * |  2 |1 |
 * |  3 |2 |
 * |  1 |2 |
 * |  4 |1 |
 * ++--+
 */
//test1();

/**
 * ??API
 * StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 * streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
 * StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);
 * +++--+
 * | op |  a |   EXPR$1 |
 * +++--+
 * | +I |  2 |1 |
 * | +I |  1 |1 |
 * | +I |  4 |1 |
 * | -U |  1 |1 |
 * | +U |  1 |2 |
 * | +I |  3 |1 |
 * | -U |  3 |1 |
 * | +U |  3 |2 |
 * +++--+
 */
}

public static void test() {
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

bbTableEnv.executeSql("CREATE TABLE ab (" +
"  a STRING, " +
"  b INT " +
") WITH (" +
"   'connector' = 'jdbc'," +
"   'url' = 
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
"   'username' = 'root'," +
"   'password' = 'root'," +
"   'table-name' = 'ab'" +
" )");

bbTableEnv.sqlQuery("select a, count(b) from ab group by 
a").execute().print();

}

public static void test1() {
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv);

tableEnv.executeSql("CREATE TABLE ab (" +
"  a STRING, " +
"  b INT " +
") WITH (" +
"   'connector' = 'jdbc'," +
"   'url' = 
'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," +
"   'username' = 'root'," +
"   'password' = 'root'," +
"   'table-name' = 'ab'" +
" )");

tableEnv.sqlQuery("select a, count(b) from ab group by 
a").execute().print();
}

}

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。

yidan zhao  于2021年3月9日周二 下午7:26写道:

> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
> 然后目前通过Flink的web-ui看了下gc情况。
> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>
> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>我目前5个TM的集群,单TM100G内存,跑任务大概10w
> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>
>
> Michael Ran  于2021年3月9日周二 下午4:27写道:
>
>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>> >
>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>> >
>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>> >
>> >> 好的,我会看下。
>> >> 然后我今天发现我好多个集群GC collector不一样。
>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>> >> threads,还有一种是Mark Sweep Compact GC。
>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>> >>
>> >>
>> >>
>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>> >>
>> >>
>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>> >>
>> >>> Hi,
>> >>>
>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>> >>>
>> >>> Best,
>> >>> jjiey
>> >>>
>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>> >>> >
>> >>> >
>> >>>
>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>> >>> > leadership’ 错导致任务重启。
>> >>> >
>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>> >>> > 2021-03-08 14:31:40
>> >>> > org.apache.flink.runtime.io
>> >>> .network.netty.exception.RemoteTransportException:
>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> >>> > .java:357)
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > NettyMessageClientDecoderDelegate.channelRead(
>> >>> > NettyMessageClientDecoderDelegate.java:115)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> >>> > .java:357)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>> >>> > 1410)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>> >>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>> >>> > AbstractEpollStreamChannel.java:792)
>> >>> >at
>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> >>> > .processReady(EpollEventLoop.java:475)
>> >>> >at
>> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> >>> > .run(EpollEventLoop.java:378)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> >>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> >>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> >>> >at java.lang.Thread.run(Thread.java:748)
>> >>> > Caused by: org.apache.flink.runtime.io.network.partition.
>> >>> > ProducerFailedException: 

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
然后目前通过Flink的web-ui看了下gc情况。
发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。

(1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
(2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
   我目前5个TM的集群,单TM100G内存,跑任务大概10w
qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。


Michael Ran  于2021年3月9日周二 下午4:27写道:

> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
> >
> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
> >
> >yidan zhao  于2021年3月9日周二 下午2:56写道:
> >
> >> 好的,我会看下。
> >> 然后我今天发现我好多个集群GC collector不一样。
> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
> >> threads,还有一种是Mark Sweep Compact GC。
> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
> >>
> >>
> >>
> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
> >>
> >>
> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
> >>
> >>> Hi,
> >>>
> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
> >>>
> >>> Best,
> >>> jjiey
> >>>
> >>> > 2021年3月8日 14:37,yidan zhao  写道:
> >>> >
> >>> >
> >>>
> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
> >>> > leadership’ 错导致任务重启。
> >>> >
> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
> >>> > 2021-03-08 14:31:40
> >>> > org.apache.flink.runtime.io
> >>> .network.netty.exception.RemoteTransportException:
> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
> >>> > CreditBasedPartitionRequestClientHandler.java:294)
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
> >>> > CreditBasedPartitionRequestClientHandler.java:183)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> >>> > .java:357)
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > NettyMessageClientDecoderDelegate.channelRead(
> >>> > NettyMessageClientDecoderDelegate.java:115)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> >>> > .java:357)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
> >>> > 1410)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
> >>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
> >>> > AbstractEpollStreamChannel.java:792)
> >>> >at
> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> >>> > .processReady(EpollEventLoop.java:475)
> >>> >at
> >>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> >>> > .run(EpollEventLoop.java:378)
> >>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> >>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> >>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
> >>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> >>> >at java.lang.Thread.run(Thread.java:748)
> >>> > Caused by: org.apache.flink.runtime.io.network.partition.
> >>> > ProducerFailedException: org.apache.flink.util.FlinkException:
> >>> JobManager
> >>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
> >>> >at org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue
> >>> > 

Re: Trigger and completed Checkpointing do not appeared

2021-03-09 Thread Abdullah bin Omar
Hi Smile,

Thank you for your reply.

I read [1] according to the last email. I will have to add implements
SourceFunction   CheckpointedFunction with the main class. Then
calling run() and cancel() inside the main class. Is it correct?

I just run the sample code from apache flink. I can not understand
everything.

In this case, could you please inform me where I will have to change in my
code (it is in the first email) and which one to replace? If you can
answer, it will be really helpful for me as a newbie.

Thank you in advance!

On Tue, Mar 9, 2021 at 4:07 AM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi Smile,
>
> Thank you for your reply.
>
> I read [1] according to the last email. I will have to add implements
> SourceFunction   CheckpointedFunction with the main class. Then
> calling run() and cancel() inside the main class. Is it correct?
>
> I just run the sample code from apache flink. I can not understand
> everything.
>
> In this case, could you please inform me where I will have to change in my
> code (it is in the first email) and which one to replace? If you can
> answer, it will be really helpful for me as a newbie.
>
> Thank you in advance!
>
>
>
> On Mon, Mar 8, 2021 at 10:21 PM Smile  wrote:
>
>> Hi,
>>
>> Could you please change the source to an endless one? For example a Kafka
>> source or a custom source that implements SourceFunction([1])?
>> env.readTextFile() won't wait for all data to be finished, but exit
>> immediately after telling readers what to read. So it may exit before the
>> first checkpoint being triggered. See [2] for more information.
>>
>> [1].
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>> [2].
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-
>>
>> Regards,
>> Smile
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-09 Thread Guowei Ma
Hi, all

Thanks all for your suggestions and feedback.
I think it is a good idea that we increase the default size of the
separated pool by testing. I am fine with adding the suffix(".size") to the
config name, which makes it more clear to the user.
But I am a little worried about adding a prefix("framework") because
currently the tm shuffle service is only a shuffle-plugin, which is not a
part of the framework. So maybe we could add a clear explanation in the
document?

Best,
Guowei


On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧)  wrote:

> Thanks for the suggestions. I will do some tests and share the results
> after the implementation is ready. Then we can give a proper default value.
>
> Best,
> Yingjie
>
> --
> 发件人:Till Rohrmann
> 日 期:2021年03月05日 23:03:10
> 收件人:Stephan Ewen
> 抄 送:dev; user; Xintong Song<
> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma<
> guowei@gmail.com>
> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
> shuffle
>
> Thanks for this proposal Guowei. +1 for it.
>
> Concerning the default size, maybe we can run some experiments and see how
> the system behaves with different pool sizes.
>
> Cheers,
> Till
>
> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen  wrote:
>
>> Thanks Guowei, for the proposal.
>>
>> As discussed offline already, I think this sounds good.
>>
>> One thought is that 16m sounds very small for a default read buffer pool.
>> How risky do you think it is to increase this to 32m or 64m?
>>
>> Best,
>> Stephan
>>
>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:
>>
>>> Hi, all
>>>
>>>
>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>> main reason is that the default configuration always makes users encounter
>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>> to avoid the problem.
>>> Goals
>>>
>>>1. Don't affect the streaming and pipelined-shuffle-only batch
>>>setups.
>>>2. Don't mix memory with different life cycle in the same pool.
>>>E.g., write buffers needed by running tasks and read buffer needed even
>>>after tasks being finished.
>>>3. User can use the TM merge shuffle with default memory
>>>configurations. (May need further tunings for performance optimization, 
>>> but
>>>should not fail with the default configurations.)
>>>
>>> Proposal
>>>
>>>1. Introduce a configuration `taskmanager.memory.network.batch-read`
>>>to specify the size of this memory pool. The default value is 16m.
>>>2. Allocate the pool lazily. It means that the memory pool would be
>>>allocated when the TM merge shuffle is used at the first time.
>>>3. This pool size will not be add up to the TM's total memory size,
>>>but will be considered part of
>>>`taskmanager.memory.framework.off-heap.size`. We need to check that the
>>>pool size is not larger than the framework off-heap size, if TM merge
>>>shuffle is enabled.
>>>
>>>
>>> In this default configuration, the allocation of the memory pool is
>>> almost impossible to fail. Currently the default framework’s off-heap
>>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>>> copy, the usage of it has been reduced, and you can refer to the detailed
>>> data [2].
>>> Known Limitation
>>> Usability for increasing the memory pool size
>>>
>>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>>> at the same time. It also means that once the user forgets this, it is
>>> likely to fail the check when allocating the memory pool.
>>>
>>>
>>> So in the following two situations, we will still prompt the user to
>>> increase the size of `framework.off-heap.size`.
>>>
>>>1. `taskmanager.memory.network.batch-read` is bigger than
>>>`taskmanager.memory.framework.off-heap.size`
>>>2. Allocating the pool encounters the OOM.
>>>
>>>
>>> An alternative is that when the user adjusts the size of the memory
>>> pool, the system automatically adjusts it. But we are not entierly sure
>>> about this, given its implicity and complicating the memory configurations.
>>> Potential memory waste
>>>
>>> In the first step, the memory pool will not be released once allocated. This
>>> means in the first step, even if there is no subsequent batch job, the
>>> pooled memory cannot be used by other consumers.
>>>
>>>
>>> We are not releasing the pool in the first step due to the concern that
>>> frequently allocating/deallocating the entire pool may increase the GC
>>> pressue. Investitations on how to dynamically release the pool when it's no
>>> longer needed is considered a future follow-up.
>>>
>>>
>>> Looking forward to your feedback.
>>>
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>>
>>> [2] https://github.com/apache/flink/pull/7368.
>>> 

PyFlink UDTF ???????????????? NullPointerException

2021-03-09 Thread ??????
PyFlink UDTFUDTF


class Mac(TableFunction):
  def eval(self, body_data):
mac_arr = [

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"44:c7:fc:24:0f:91","rssi":"-92","range":"100.0"},

{"mac":"08:10:79:67:16:1b","rssi":"-85","router":"Netcore_67161B","range":"55.0"}
]
return mac_arr


table_env.create_temporary_function("mac", udtf(Mac(), 
result_types=[DataTypes.MAP(DataTypes.STRING(),DataTypes.STRING())]))



??


INSERT INTO MacTableSink select data['mac'] mac, data['rssi'] rssi, 
data['router'] router, data['range'] distance from KafkaMacTable, lateral 
table(mac(body_data)) as T(data)


??


2021-03-09 17:19:29
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
 at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
 at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at 

Re:Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-09 Thread Michael Ran
1.两套逻辑结果,只能定时任务做check2.同一套逻辑,就要具体分析了,只要不是一个人、一套代码逻辑出来的,都有可能出问题
在 2021-03-09 12:51:50,"Smile"  写道:
>对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。
>有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread Michael Ran
看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
在 2021-03-09 14:57:43,"yidan zhao"  写道:
>而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>
>或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>
>yidan zhao  于2021年3月9日周二 下午2:56写道:
>
>> 好的,我会看下。
>> 然后我今天发现我好多个集群GC collector不一样。
>> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>> threads,还有一种是Mark Sweep Compact GC。
>> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>
>>
>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>
>>
>> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>
>>> Hi,
>>>
>>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>>
>>> Best,
>>> jjiey
>>>
>>> > 2021年3月8日 14:37,yidan zhao  写道:
>>> >
>>> >
>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>>> > leadership’ 错导致任务重启。
>>> >
>>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>> > 2021-03-08 14:31:40
>>> > org.apache.flink.runtime.io
>>> .network.netty.exception.RemoteTransportException:
>>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>> > CreditBasedPartitionRequestClientHandler.java:294)
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>> > CreditBasedPartitionRequestClientHandler.java:183)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> > .java:357)
>>> >at org.apache.flink.runtime.io.network.netty.
>>> > NettyMessageClientDecoderDelegate.channelRead(
>>> > NettyMessageClientDecoderDelegate.java:115)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> > .java:357)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >
>>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>>> > 1410)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:379)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> > AbstractChannelHandlerContext.java:365)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>>> > AbstractEpollStreamChannel.java:792)
>>> >at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> > .processReady(EpollEventLoop.java:475)
>>> >at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>>> > .run(EpollEventLoop.java:378)
>>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> >at java.lang.Thread.run(Thread.java:748)
>>> > Caused by: org.apache.flink.runtime.io.network.partition.
>>> > ProducerFailedException: org.apache.flink.util.FlinkException:
>>> JobManager
>>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .enqueueAvailableReader(PartitionRequestQueue.java:108)
>>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>> > .userEventTriggered(PartitionRequestQueue.java:170)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> > AbstractChannelHandlerContext.invokeUserEventTriggered(
>>> > AbstractChannelHandlerContext.java:346)
>>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >