Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-06 文章 Weihua Hu
Hi,

按照你的描述,我猜测你使用的是 Application 模式吧?这种模式下 user code 会在 JobManager 侧执行,Job
执行结束后会直接 shutdown cluster。

可以尝试使用 session mode[1] 部署 cluster

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode

Best,
Weihua


On Mon, Mar 6, 2023 at 8:54 PM wangwei  wrote:

>
> Hi,大佬们
>
> 如何在任务结束后获取Accumulator 数据?
> 参考代码:(但是无法获取)
> ableResult execute = statementSet.execute();
> Optional jobClient = execute.getJobClient();
> jobClient.get().getAccumulators().get()
>
> PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?
>
> 大佬求助!!先磕为敬
>


FlinkCEP GroupPattern 匹配结果异常问题

2023-03-06 文章 傅宣登
各位好,

我发现 FlinkCEP 中涉及 GroupPattern 的一些查询的匹配结果和直觉不太符合。

以如下查询为例:

Pattern.begin("pl").where(
new SimpleCondition() {
@Override
public boolean filter(Event value) {
return value.getName() == 2;
}
}
).times(1,3).optional().consecutive()

当输入数据流(格式为 e(id,name,price))为

e(1,1,0), e(2,2,5), e(3,1,0), e(4,2,2), e(5,1,0), e(6,3,2), e(7,1,0), e(8,2,5), 
e(9,1,8)

时,其输出的匹配有三个,分别为:

pl: e(2,2,5)
pl: e(4,2,2)
p: e(8,2,5)

但是如果在查询外面加一个 GroupPattern,即查询为

Pattern.begin(
Pattern.begin("pl").where(
new SimpleCondition() {
@Override
public boolean filter(Event value) {
return value.getName() == 2;
}
}
).times(1,3).optional().consecutive()
)

时,输出的匹配结果却是一个匹配,具体为:

pl: e(2,2,5), e(4,2,2), e(8,2,5)

我们认为这样的结果不太符合直觉。

直觉上来说,一个 pattern sequence 外面套一层 group,匹配结果应该是一样的,为何会出现这样的结果?
而且后者的结果看起来和文档里说明的也不太正确,明明加了 `consecutive()` 却返回了不连续的事件。

请问这样的情况是有 bug 还是有意为之?如果是后者是有什么考量吗?

谢谢!

Flink version: 1.14.5
Java version: 11.0.15
flink-cep_2.11: 1.14.2



傅宣登,
中国科学院软件研究所
f...@ios.ac.cn


Re: Flink作业tm Connection timed out异常问题

2023-03-06 文章 Shammon FY
Hi

很多原因都可能会导致连接失败问题,包括机器故障、系统问题或者服务器负载,如果是怀疑负载问题你可以找几台服务器和这台有疑问的服务器组成个小集群,提交一些作业,让这台服务器负载不要太高,观察一下作业运行情况

Best,
Shammon

On Mon, Mar 6, 2023 at 8:49 PM crazy <2463829...@qq.com.invalid> wrote:

> 报错日志下面这个一样,是同一个问题么
> https://issues.apache.org/jira/browse/FLINK-19925
>
>
> 其中描述到服务器 "high cpu usage or high network pressure" 可能会导致这个原因,想问下cpu usage,
> network咋样才算高?
>
>
>
>
> crazy
> 2463829...@qq.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> tanyuxinw...@gmail.com;
> 发送时间:2023年3月6日(星期一) 下午2:59
> 收件人:"user-zh"
> 主题:Re: Flink作业tm Connection timed out异常问题
>
>
>
> 不建议这样做,因为这样会掩盖问题。
>
> 但如果一定要配置"重试次数"或"超时时长" 这些参数,会涉及到很多参数,比如 akka.tcp.timeout,
> taskmanager.network.netty.client.connectTimeoutSec,
> taskmanager.network.retries等等,具体可以参考[1]。
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/
>
> Best,
> Yuxin
>
>
> crazy <2463829...@qq.com.invalid 于2023年3月6日周一 14:41写道:
>
>  机器问题从监控上暂时没发现啥问题,能否通过增加"重试次数"或"超时时长"来缓解这个问题呢?不太清楚具体参数需要设置哪些?
> 
> 
> 
> 
>  crazy
>  2463829...@qq.com
> 
> 
> 
>  nbsp;
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  tanyuxinw...@gmail.comgt;;
>  发送时间:nbsp;2023年3月6日(星期一) 下午2:33
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: Flink作业tm Connection timed out异常问题
> 
> 
> 
>  "如果进程没被调度到这台机器上,任务正常",从给出的描述来看,确实很可能是 A 这台机器有问题。
> 
>  可以检查机器 A 的网络、内存、CPU
>  指标或者监控是否正常,与其他机器是否存在不同。比如网络参数的配置、机器内存是否存在损坏、机器是否存在异常进程或负载等等。
> 
>  如果硬件问题,系统日志有可能有一些报错。也可以使用一些机器检查工具, dmesg/vmstat等。
> 
>  Best,
>  Yuxin
> 
> 
>  crazy <2463829...@qq.com.invalidgt; 于2023年3月6日周一 14:23写道:
> 
>  gt; 各位大佬好,有个线上作业频繁failover,异常日志如下:
>  gt;
>  gt; 2023-03-05 11:41:07,847 INFOnbsp;
> 
> org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  [] - Process (287/300) (b3ef27fec49fe3777f830802ef3501e9) switched
> from
>  RUNNING to FAILED on container_e26_1646120234560_82135_01_97 @
>  xx.xx.xx.xx (dataPort=26882).
>  gt; org.apache.flink.runtime.io
> .network.netty.exception.LocalTransportException:
>  readAddress(..) failed: Connection timed out (connection to 'xxx/
>  10.70.89.25:43923')
>  gt; at org.apache.flink.runtime.io
> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:201)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at
> 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ~[flink-dist_2.11-1.13.5.jar:1.13.5]
>  gt; at java.lang.Thread.run(Thread.java:748)
> ~[?:1.8.0_131]
>  gt; Caused by:
> 
> 

Re: flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 文章 Shammon FY
Hi

如果没有现成的系统函数,你可以写个自定义udf来实现
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/

Best,
Shammon


On Mon, Mar 6, 2023 at 7:46 PM 唐世伟  wrote:

>
> 我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?


flink cdc connector计划支持hudi change data capture吗?

2023-03-06 文章 casel.chen
flink cdc connector计划支持hudi change data capture吗?

flink on K8S(operator) 如何获取 Accumulator

2023-03-06 文章 wangwei

Hi,大佬们

如何在任务结束后获取Accumulator 数据?
参考代码:(但是无法获取)
ableResult execute = statementSet.execute();
Optional jobClient = execute.getJobClient();
jobClient.get().getAccumulators().get()

PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取?

大佬求助!!先磕为敬


flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 文章 唐世伟
我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?