Re: Re: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
好的谢谢回复,

在指定hive版本为2.1.1时,我选择了在程序中导入hive-exec-2.1.1、flink-connector-hive_2.11-1.11.1依赖,可正常操作hive
 table;

best,
amenhub

 
发件人: Rui Li
发送时间: 2020-08-24 21:33
收件人: user-zh
主题: Re: hive-exec依赖导致hadoop冲突问题
Hi,
 
hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink
1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber
jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes
 
On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com  wrote:
 
>
> 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
> 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> ... 19 more
> Caused by: java.lang.ClassCastException:
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto
> cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
> at
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
> ... 24 more
>
> best,
> amenhub
>
> 发件人: amen...@163.com
> 发送时间: 2020-08-24 20:40
> 收件人: user-zh
> 主题: hive-exec依赖导致hadoop冲突问题
> hi, everyone
>
> 组件版本:flink-1.11.1,hive-2.1.1
>
> 问题描述:
> 使用Table
> API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
>
> 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive
> table(不会发生hadoop依赖冲突);
>
> 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
>
>
> 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
>
> best,
> amenhub
>
 
 
-- 
Best regards!
Rui Li


flink1.11 sql问题

2020-08-24 Thread 酷酷的浑蛋




flink1.11 
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?

flink on yarn 提交作业application模式的2个疑问

2020-08-24 Thread yang zhang
1.flink on yarn 的application模式怎么提交多个job组成应用程序呢?在官网和论坛的文章中这里没有详细展开。

与per-job 模式相比,Application 
模式允许提交由多个Job组成的应用程序。Job执行的顺序不受部署模式的影响,但受启动Job的调用的影响。使用阻塞的 
execute()方法,将是一个顺序执行的效果,结果就是"下一个"Job的执行被推迟到“该”Job完成为止。相反,一旦提交当前作业,非阻塞executeAsync()方法将立即继续提交“下一个”Job。

怎么做到呢?

2.而且提交job时,没有指定运行main方法的类,那么在jobmanager是如何找到对应的执行main呢?
官网比如这个指令:
./bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=2048m \ 
-Dtaskmanager.memory.process.size=4096m \ ./MyApplication.jar



发自我的iPhone

Re: Flink Couchbase

2020-08-24 Thread Vijayendra Yadav
Thanks Yang that helped.

Sent from my iPhone

> On Aug 24, 2020, at 8:44 PM, Yang Wang  wrote:
> 
> 
> I think at least you have two different exceptions.
> 
> > java.lang.Exception: Container released on a *lost* node
> This usually means a Yarn nodemanager is down. So all the containers running 
> on this node will be 
> released and rescheduled to a new one. If you want to figure out the root 
> cause, you need to check
> the Yarn nodemanager logs.
> 
> > java.lang.OutOfMemoryError: Metaspace
> Could you check the value of flink configuration 
> "taskmanager.memory.jvm-metaspace.size"? If it is
> too small, increasing it will help. Usually, 256m is enough for most cases.
> 
> 
> Best,
> Yang
> 
> Vijayendra Yadav  于2020年8月25日周二 上午4:51写道:
>> Another one -
>> 
>> Exception in thread "FileCache shutdown hook"
>> Exception: java.lang.OutOfMemoryError thrown from the 
>> UncaughtExceptionHandler in thread "FileCache shutdown hook"
>> 
>> Regards,
>> Vijay
>> 
>>> On Mon, Aug 24, 2020 at 1:04 PM Vijayendra Yadav  
>>> wrote:
>>> Actually got this message in rolledover container logs: 
>>> 
>>> [org.slf4j.impl.Log4jLoggerFactory]
>>> Exception in thread "cb-timer-1-1" java.lang.OutOfMemoryError: Metaspace
>>> Exception in thread "Thread-16" java.lang.OutOfMemoryError: Metaspace
>>> Exception in thread "TransientBlobCache shutdown hook" 
>>> java.lang.OutOfMemoryError: Metaspace
>>> Exception in thread "FileChannelManagerImpl-io shutdown hook" 
>>> java.lang.OutOfMemoryError: Metaspace
>>> Exception in thread "Kafka Fetcher for Source: flink-kafka-consumer -> Map 
>>> -> Filter -> Map -> Sink: s3-sink-raw (2/3)" java.lang.OutOfMemoryError: 
>>> Metaspace
>>> Exception in thread "FileCache shutdown hook" java.lang.OutOfMemoryError: 
>>> Metaspace
>>> Any suggestions on how to fix it ?
>>> 
>>> 
 On Mon, Aug 24, 2020 at 12:53 PM Vijayendra Yadav  
 wrote:
 Hi Team,
 
 Running a flink job on Yarn, I am trying to make connections to couchbase 
 DB in one of my map functions in Flink Streaming job. But my task manager 
 containers keep failing
 and keep assigning new containers and not giving me an opportunity to get 
 any useful logs. 
 
  val cluster = Cluster.connect("host", "user", "pwd")
  val bucket = cluster.bucket("bucket")
  val collection = bucket.defaultCollection
 
 Only thing I see is yarn exception: 
 
 java.lang.Exception: Container released on a *lost* node
 at 
 org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
 at 
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
 at 
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
 at 
 org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at 
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
 akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
 akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
 
 
 Could you please provide any insight on how to get logs. And why a simple 
 connection will not work.
 
 Note: it works in my local system yarn.
 
 Regards,
 Vijay


Re: flink 1.10 如何监控 Managed memory 使用量

2020-08-24 Thread Xintong Song
这个问题已经有相关的邮件讨论 [1] 和 jira issue [2] 了。目前应该是在方案细节上还没有完全讨论清楚。

Thank you~

Xintong Song


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-102-Add-More-Metrics-to-TaskManager-td37898.html
[2] https://issues.apache.org/jira/browse/FLINK-14431

On Tue, Aug 25, 2020 at 11:45 AM ouywl  wrote:

>  @Xintong Song
> 我决定我们可以创建一个jira来跟踪下这个问题,调整metrics的值来对应taskmanager的内存模型,再而展示到flink
> webUi,这样的话我可以更好的让用户调整参数。目前我们去看flink的tm的内存模型需要通过日志查看,然后根据表格去计算。我们可以做一个针对每块内存的使用情况和分配情况的图表到flink
> webUI,从我们实践的过程中这是一个非常有用的功能。
>
> Best,
> Ouywl
>
> 在2020年08月25日 10:40,Xintong Song
>  写道:
>
> 目前 flink webui 上 taskmanager metrics 的展示是有问题的,metrics 无法很好地对应到 taskmanager
> 内存模型中的各个部分。关于这一问题,社区也在讨论接下来的优化方案,可以参考一下 FLIP-112。
>
> 关于 managed memory,如你所说,是不在上述的 metrics 中的。
>
> 监控 managed memory 的用量其实不是很有必要:
>
> - 对于批处理,不管配置了多少 managed memory 都会被用掉。算子会根据 managed memory
> 的大小决定申请多大的缓存,不会有内存浪费不用的情况。
> - 对于流处理,
> - 使用 rocksdb state backend 时,
> - rocksdb 默认也是根据 managed memory 的大小决定申请多大的缓存。
> - 此外,为了兼容此前版本的行为,rocksdb 也支持不根据 managed memory 决定内存大小。这种情况下,可以监控
> rocksdb 本身的 metrics 判断内存用量
> - 使用其他 state backend 的时候,不会用到 managed memory,应该配置为 0.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 24, 2020 at 5:53 PM 莫失莫忘  wrote:
>
> flink 1.10 作业,监控内存的四个metric,Heap、NonHeap、Direct 和 Mapped,Managed
> memory的使用量在这四个中吗?如果在,是哪个?  如果不在,如何监控Managed memory 的使用量呢?
> ps:
> 看官方文档 Managed memory 不在上述四个中的任何一个
> 我自己做的任务监控,感觉 Managed memory也不在上诉四个中的一个
>
>
> 现在遇到的问题是: flink 内存监控,使用 sum =Heap+NonHeap+Direct+Mapped 作为已用内存,已用内存 占比
> 最大内存 很小时,提高内存作业吞吐量大幅上升。怀疑 该 sum 不包括 Managed memory。
>
> 内存使用如图所示: taskmanager.memory.process.size=8192mb
>
>
>
>
>


回复: Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 Thread dixingxin...@163.com
Hi:
Leonard Xu 感谢回复
> 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
> 不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
这里恰好是我的疑问,之前看debezium代码,没有找到使用jdbc加载全量历史数据的代码,debezium的snapshot看起来只是保存了表的schema变更记录,这样重新消费binlog时,可以为每条binlog数据找到正确schema,达到正确解析历史数据的目的。

我的疑问是,如果加载全量历史数据,只是指定binlog的offset,从头读取binlog,那么是不是有可能无法加载到全量的数据,因为通常binlog是有过期时间的,不会保存全量的binlog。如果我理解的没问题,那么目前flink1.11
 的cdc是无法加载全量历史数据的。

我理解加载全量数据,无非两种方式:
1.jdbc从源表全量拉取数据
2.将原表数据初始化到一个kafka 
topic中(topic设置为compact模式),再消费binlog,往这个topic里写入增量数据,确保这个topic的数据和原表一致,然后flink作业启动时,从这个topic的earliest
 offset消费,得到全量的历史数据。

不知道我的理解是否正确,希望能帮忙解答



Best,
Xingxing Di
 
发件人: Leonard Xu
发送时间: 2020-08-25 10:03
收件人: user-zh
主题: Re: 请问一下,flink 1.11 的cdc历史数据问题
Hello
 
> Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
> 1.底层是使用了debezium来加载历史数据的吗?
Flink支持两种CDC格式,debezium json和 canal json, debezium 和 
canal都是CDC系统,简单说他们可以把数据库表的binlog以对应的json写入到消息队列如Kafka,
作为下游系统的Flink 支持去消费对应的cdc数据,两个CDC工作都支持加载历史数据的。
另外Jark 在Veverica 开源了一个Flink CDC connector 
[1],支持利用debezuim直接读取数据库的cdc数据,不需要搭建CDC系统。
 
> 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
 
 
Best
Leonard
[1] https://github.com/ververica/flink-cdc-connectors 



Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 Thread Yang Wang
如果是$FLINK_HOME/lib下面的文件,就一定会上传到hdfs上面,并且注册为LocalResource的
在JM/TM failover以后,是可以重新下载并且加载的

你可以把报ClassNotFound的JM/TM日志发出来,这样方便分析

Best,
Yang

xiao cai  于2020年8月25日周二 上午9:30写道:

> Hi
>
> 确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。
>
>
> Best,
> xiao cai
>
>
>  原始邮件
> 发件人: Congxian Qiu
> 收件人: user-zh
> 发送时间: 2020年8月24日(周一) 20:39
> 主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
>
>
> Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best,
> Congxian xiao cai  于2020年8月20日周四 下午2:27写道: > Hi: >
> 感谢答复,确实是个思路。 > >
> 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。
> > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超 > 收件人:
> user-zh@flink.apache.org > 发送时间:
> 2020年8月20日(周四) 09:11 > 主题: 答复: Flink on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我之前开启job的failover >
> restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
> > executor No TaskExecutor registered under containe_. >
> 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai >
> [mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < >
> user-zh@flink.apache.org> 主题: Flink on Yarn >
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn >
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink >
> 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
> > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO >
> org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. >
> 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] -
> > Received 1 containers with resource , 1 pending >
> container requests. 2020-08-19 11:23:08,100 INFO >
> org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor >
> container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 >
> with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb >
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), >
> taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, >
> networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb >
> (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), >
> jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO
> > org.apache.flink.yarn.YarnResourceManager [] - Creating container launch
> > context for TaskManagers 2020-08-19 11:23:08,101 INFO >
> org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers >
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> > Removing container request Capability[ vCores:4>]Priority[1]. > 2020-08-19 11:23:08,102 INFO
> org.apache.flink.yarn.YarnResourceManager [] - > Accepted 1 requested
> containers, returned 0 excess containers, 0 pending > container requests of
> resource . 2020-08-19 > 11:23:08,102 INFO >
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - >
> Processing Event EventType: START_CONTAINER for Container >
> container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR >
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> > [] - Unhandled exception. >
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> > No TaskExecutor registered under >
> container_e07_1596440446172_0094_01_68. at >
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at >
> sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) >
> ~[?:1.8.0_191] at >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at >
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at >
> 

Re: Flink Couchbase

2020-08-24 Thread Yang Wang
I think at least you have two different exceptions.

> java.lang.Exception: Container released on a *lost* node
This usually means a Yarn nodemanager is down. So all the containers
running on this node will be
released and rescheduled to a new one. If you want to figure out the root
cause, you need to check
the Yarn nodemanager logs.

> java.lang.OutOfMemoryError: Metaspace
Could you check the value of flink configuration
"taskmanager.memory.jvm-metaspace.size"? If it is
too small, increasing it will help. Usually, 256m is enough for most cases.


Best,
Yang

Vijayendra Yadav  于2020年8月25日周二 上午4:51写道:

> Another one -
>
> Exception in thread "FileCache shutdown hook"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "FileCache shutdown hook"
>
> Regards,
> Vijay
>
> On Mon, Aug 24, 2020 at 1:04 PM Vijayendra Yadav 
> wrote:
>
>> Actually got this message in rolledover container logs:
>>
>> [org.slf4j.impl.Log4jLoggerFactory]
>> Exception in thread "cb-timer-1-1" java.lang.OutOfMemoryError: Metaspace
>> Exception in thread "Thread-16" java.lang.OutOfMemoryError: Metaspace
>> Exception in thread "TransientBlobCache shutdown hook" 
>> java.lang.OutOfMemoryError: Metaspace
>> Exception in thread "FileChannelManagerImpl-io shutdown hook" 
>> java.lang.OutOfMemoryError: Metaspace
>> Exception in thread "Kafka Fetcher for Source: flink-kafka-consumer -> Map 
>> -> Filter -> Map -> Sink: s3-sink-raw (2/3)" java.lang.OutOfMemoryError: 
>> Metaspace
>> Exception in thread "FileCache shutdown hook" java.lang.OutOfMemoryError: 
>> Metaspace
>>
>> Any suggestions on how to fix it ?
>>
>>
>>
>> On Mon, Aug 24, 2020 at 12:53 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Running a flink job on Yarn, I am trying to make connections to
>>> couchbase DB in one of my map functions in Flink Streaming job. But my task
>>> manager containers keep failing
>>> and keep assigning new containers and not giving me an opportunity to
>>> get any useful logs.
>>>
>>>  val cluster = Cluster.connect("host", "user", "pwd")
>>>  val bucket = cluster.bucket("bucket")
>>>  val collection = bucket.defaultCollection
>>>
>>> Only thing I see is yarn exception:
>>>
>>> java.lang.Exception: Container released on a *lost* node
>>> at org.apache.flink.yarn.YarnResourceManager
>>> .lambda$onContainersCompleted$0(YarnResourceManager.java:343)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>> AkkaRpcActor.java:397)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>> AkkaRpcActor.java:190)
>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>> AkkaRpcActor.java:152)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123
>>> )
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:
>>> 21)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 170)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 171)
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>> 171)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>> ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
>>> 1979)
>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>> Could you please provide any insight on how to get logs. And why a
>>> simple connection will not work.
>>>
>>> Note: it works in my local system yarn.
>>>
>>> Regards,
>>> Vijay
>>>
>>


答复: How to sink invalid data from flatmap

2020-08-24 Thread 范超
Thanks a lot Jake for the quick response

发件人: Jake [mailto:ft20...@qq.com]
发送时间: 2020年8月25日 星期二 11:31
收件人: 范超 
抄送: user 
主题: Re: How to sink invalid data from flatmap

Hi fanchao

Yes. I suggest that.

Jake


On Aug 25, 2020, at 11:20 AM, 范超 mailto:fanc...@mgtv.com>> 
wrote:

Thanks Jake. But can I just want to  implement the ouput-tag function in my 
flatmap function not in the process function. I check the parameters for the 
flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite 
my flatmap function?

发件人: Jake [mailto:ft20...@qq.com]
发送时间: 2020年8月25日 星期二 11:06
收件人: 范超 mailto:fanc...@mgtv.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: How to sink invalid data from flatmap

Hi fanchao

use side output, see[1]

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

Jake



On Aug 25, 2020, at 10:54 AM, 范超 mailto:fanc...@mgtv.com>> 
wrote:

Hi,
I’m using the custom flatmap function to validate the kafka json string 
message, if the kafka message is valid to transform to a pojo  (using GSON), 
then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the 
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
just catch this exception, and then go on, but this invalidated json message is 
just omitted.

But now, I want to save the invalidated json message to sink the original kafka 
json string to another table, but don’t know how to implement in my custom 
flatmap function, because the richmapfucntion has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan



[ANNOUNCE] Apache Flink 1.10.2 released

2020-08-24 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/08/25/release-1.10.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


[ANNOUNCE] Apache Flink 1.10.2 released

2020-08-24 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/08/25/release-1.10.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


Re: How to sink invalid data from flatmap

2020-08-24 Thread Jake
Hi fanchao

Yes. I suggest that.

Jake

> On Aug 25, 2020, at 11:20 AM, 范超  wrote:
> 
> Thanks Jake. But can I just want to  implement the ouput-tag function in my 
> flatmap function not in the process function. I check the parameters for the 
> flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite 
> my flatmap function?
>  
> 发件人: Jake [mailto:ft20...@qq.com] 
> 发送时间: 2020年8月25日 星期二 11:06
> 收件人: 范超 
> 抄送: user 
> 主题: Re: How to sink invalid data from flatmap
>  
> Hi fanchao
>  
> use side output, see[1]
>  
> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html
>  
> 
>  
> Jake
> 
> 
> On Aug 25, 2020, at 10:54 AM, 范超 mailto:fanc...@mgtv.com>> 
> wrote:
>  
> Hi, 
> I’m using the custom flatmap function to validate the kafka json string 
> message, if the kafka message is valid to transform to a pojo  (using GSON), 
> then go on with the next sink step.
> If it can not be parsed as a POJO, the GSON will throw the 
> “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
> just catch this exception, and then go on, but this invalidated json message 
> is just omitted.
>  
> But now, I want to save the invalidated json message to sink the original 
> kafka json string to another table, but don’t know how to implement in my 
> custom flatmap function, because the richmapfucntion has limited the collect 
> type.
> Could someone give me some advice please?
> Thanks in advance!
> Chao Fan



答复: How to sink invalid data from flatmap

2020-08-24 Thread 范超
Thanks Jake. But can I just want to  implement the ouput-tag function in my 
flatmap function not in the process function. I check the parameters for the 
flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite 
my flatmap function?

发件人: Jake [mailto:ft20...@qq.com]
发送时间: 2020年8月25日 星期二 11:06
收件人: 范超 
抄送: user 
主题: Re: How to sink invalid data from flatmap

Hi fanchao

use side output, see[1]

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

Jake


On Aug 25, 2020, at 10:54 AM, 范超 mailto:fanc...@mgtv.com>> 
wrote:

Hi,
I’m using the custom flatmap function to validate the kafka json string 
message, if the kafka message is valid to transform to a pojo  (using GSON), 
then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the 
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
just catch this exception, and then go on, but this invalidated json message is 
just omitted.

But now, I want to save the invalidated json message to sink the original kafka 
json string to another table, but don’t know how to implement in my custom 
flatmap function, because the richmapfucntion has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan



Re: flink on yarn配置问题

2020-08-24 Thread Yang Wang
你确认upd_security这个queue是存在的吧,另外你Yarn集群的scheduler是capacityScheduler还是FairScheduler
如果是Fair的话,需要指定完整的queue名字,而不是叶子节点的


Best,
Yang

赵一旦  于2020年8月24日周一 上午10:55写道:

> 比如今天尝试了一波命令:./bin/yarn-session.sh -nm test_flink -q -qu upd_security -s 1
> -tm 3024MB -jm 3024MB
> 同时我设置了 export HADOOP_USER_NAME=xxx
> ,这个在启动的时候会看到日志:org.apache.flink.runtime.security.modules.HadoopModule  -
> Hadoop user set to upd_security (auth:SIMPLE)。
>
> 然后报错:
>
> 2020-08-24 10:52:31 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli  -
> Error while running the Flink session.
> java.lang.RuntimeException: Couldn't get cluster description
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.getClusterDescription(YarnClusterDescriptor.java:1254)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:534)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
> Caused by: java.lang.NullPointerException: null
> at
>
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getChildQueues(YarnClientImpl.java:587)
> at
>
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getAllQueues(YarnClientImpl.java:557)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.getClusterDescription(YarnClusterDescriptor.java:1247)
> ... 7 common frames omitted
>
> 
>  The program finished with the following exception:
>
> java.lang.RuntimeException: Couldn't get cluster description
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.getClusterDescription(YarnClusterDescriptor.java:1254)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:534)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
>
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getChildQueues(YarnClientImpl.java:587)
> at
>
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getAllQueues(YarnClientImpl.java:557)
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.getClusterDescription(YarnClusterDescriptor.java:1247)
> ... 7 more
>
>
>
>
>
> caozhen  于2020年8月24日周一 上午10:00写道:
>
> > 报错是 AM申请资源时vcore不够
> >
> > 1、可以确认当前队列是否有足够的vcore
> > 2、确认当前队列允许允许的最大application数
> >
> > 我之前遇到这个问题是队列没有配置好资源导致
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


答复: How to sink invalid data from flatmap

2020-08-24 Thread 范超
Thanks , Using the ctx.output() inside the process method solved my problem, 
but my custom flatmap function has to be retired?

发件人: Yun Tang [mailto:myas...@live.com]
发送时间: 2020年8月25日 星期二 10:58
收件人: 范超 ; user 
主题: Re: How to sink invalid data from flatmap

Hi Chao

I think side output [1] might meet your requirements.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

Best
Yun Tang

From: 范超 mailto:fanc...@mgtv.com>>
Sent: Tuesday, August 25, 2020 10:54
To: user mailto:user@flink.apache.org>>
Subject: How to sink invalid data from flatmap


Hi,

I’m using the custom flatmap function to validate the kafka json string 
message, if the kafka message is valid to transform to a pojo  (using GSON), 
then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the 
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
just catch this exception, and then go on, but this invalidated json message is 
just omitted.



But now, I want to save the invalidated json message to sink the original kafka 
json string to another table, but don’t know how to implement in my custom 
flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan




Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-24 Thread song wang
是的,用的是yarn-session的方式提交的任务,日志有很多,我再查查日志

Xintong Song  于2020年8月25日周二 上午10:55写道:

> 按你的描述,我理解是在 yarn 上起了一个 flink session,然后往这个 session
> 里提交作业,一开始能正常提交运行,后来再作业开始出现问题?
>
> 具体问题的原因还是要看日志才能确定,这个有可能是 ResourceManager,也有可能是 HA 或者 Akka 的问题,仅凭现象描述很难判断。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Aug 25, 2020 at 10:23 AM song wang 
> wrote:
>
> >
> >
> 你好,这个报错是任务启动了很长一段时间后才发生的,之前一直正常运行,出现这个报错后就提交不了任务了,感觉是ResourceManager出现了问题,ResourceManager有可能自己死掉吗?
> > 另外,这个现场还没有关掉,有什么办法排查下jobmaster和ResourceManager是否正常吗?
> >
> > Xintong Song  于2020年8月25日周二 上午9:46写道:
> >
> > > 从日志上看是 JobMaster 连不上 ResourceManager。这两个组件应该是在同一个进程内的,通过 akka 进行本地通信。
> > > 需要看下完整的日志,RM 是否成功启动并注册到了 akka  的 actor system,以及注册的路径是否和 JM 尝试连接的路径一致。
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Aug 24, 2020 at 3:41 PM song wang 
> > > wrote:
> > >
> > > > 各位老哥, flink
> > > > 运行在yarn上,偶尔报错无法解析ResourceManager地址,可是从对应的host上查找是有flink进程的,请问是什么原因呢?
> > > > flink 版本1.9.0
> > > >
> > > > 部分日志如下:
> > > > ```
> > > > 2020-08-24 15:11:31,566 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - Could not resolve ResourceManager address
> > > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> > 1
> > > > ms:
> > > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> > that
> > > > the recipient actor didn't send a reply..
> > > > 2020-08-24 15:11:51,606 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - Could not resolve ResourceManager address
> > > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> > 1
> > > > ms:
> > > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> > that
> > > > the recipient actor didn't send a reply..
> > > > 2020-08-24 15:12:11,645 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - Could not resolve ResourceManager address
> > > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> > 1
> > > > ms:
> > > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> > that
> > > > the recipient actor didn't send a reply..
> > > > 2020-08-24 15:12:31,687 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - Could not resolve ResourceManager address
> > > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> > 1
> > > > ms:
> > > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> > that
> > > > the recipient actor didn't send a reply..
> > > > 2020-08-24 15:12:51,727 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - Could not resolve ResourceManager address
> > > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> > 1
> > > > ms:
> > > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> > that
> > > > the recipient actor didn't send a reply..
> > > > 2020-08-24 15:13:08,198 INFO
> > > >  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> > > > Streaming WordCount (ff0ab7ec3e577a8e0c69e1c8454e5b72) switched from
> > > state
> > > > RUNNING to FAILING.
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > > > Could not allocate all requires slots within timeout of 30 ms.
> > Slots
> > > > required: 9, slots allocated: 0, previous allocation IDs: [],
> execution
> > > > status: completed exceptionally:
> > > java.util.concurrent.CompletionException:
> > > > java.util.concurrent.CompletionException:
> > > >
> > > >
> > >
> >
> java.util.concurrent.TimeoutException/java.util.concurrent.CompletableFuture@28d7ad5d
> > > > [Completed
> > > > exceptionally], incomplete:
> > > java.util.concurrent.CompletableFuture@663cdf7e
> > > > [Not
> > > > completed, 1 dependents], incomplete:
> > > > java.util.concurrent.CompletableFuture@2058a7e9[Not completed, 1
> > > > dependents], incomplete:
> > java.util.concurrent.CompletableFuture@5c1121c8
> > > > [Not
> > > > completed, 1 dependents], incomplete:
> > > > java.util.concurrent.CompletableFuture@49b9c252[Not completed, 1
> > > > dependents], incomplete:
> > java.util.concurrent.CompletableFuture@497e3334
> > > > [Not
> > > > completed, 1 dependents], incomplete:
> > > > 

Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-24 Thread Yun Tang
Hi

TiKV 本身就是分布式的,多副本的,可以类比HBase,所以不是将其向Flink内置的state 
backend靠拢,而是向Flink读写HBase靠拢,这样若干写TiKV的Flink作业就做到了数据共享。

如果想将TiKV向Flink 
state-backend靠拢,TiKV本身的分布式架构,多副本机制,网络传输(而不是本地磁盘访问)都是缺点或者说不再必要存在的特性。
最后就会演化成现在Flink + RocksDB state-backend的架构,更何况TiKV就是基于RocksDB的,整体意义不是很大。

祝好
唐云

From: Congxian Qiu 
Sent: Monday, August 24, 2020 20:17
To: user-zh 
Subject: Re: 有没有可能使用tikv作为flink 分布式的backend

Hi
   StateBackend 可以理解为 一个 KV 存储加上一个 snapshot 过程,其中 snapshot 过程负责将当前 KV
存储的数据进行备份。理论上任何的 KV 存储都是有可能作为 StateBackend 的,不过增加一种 StateBackend 的话,需要实现相应的
snapshot/restore 逻辑。

   但是在多个 Flink 作业中实现共享的 state 这个在 Flink 中是不支持的。
Best,
Congxian


wxpcc  于2020年8月21日周五 下午6:33写道:

> 项目里有部分需要进行状态共享的需求,多个flink 任务之间
>
> 如题,tikv本身基于rocksdb 是否有可能扩展成为分布式 backend
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: How to sink invalid data from flatmap

2020-08-24 Thread Jake
Hi fanchao

use side output, see[1]

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html
 


Jake

> On Aug 25, 2020, at 10:54 AM, 范超  wrote:
> 
> Hi, 
> I’m using the custom flatmap function to validate the kafka json string 
> message, if the kafka message is valid to transform to a pojo  (using GSON), 
> then go on with the next sink step.
> If it can not be parsed as a POJO, the GSON will throw the 
> “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
> just catch this exception, and then go on, but this invalidated json message 
> is just omitted.
>  
> But now, I want to save the invalidated json message to sink the original 
> kafka json string to another table, but don’t know how to implement in my 
> custom flatmap function, because the richmapfucntion has limited the collect 
> type.
> Could someone give me some advice please?
> Thanks in advance!
> Chao Fan



Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-24 Thread Paul Lam
Hi Aljoscha,

I'm lightly leaning towards keeping the 0.10 connector, for Kafka 0.10 still 
has a steady user base in my observation. 

But if we drop 0.10 connector, can we ensure the users would be able to 
smoothly migrate to 0.11 connector/universal connector?

If I remember correctly, the universal connector is compatible with 0.10 
brokers, but I want to double check that.

Best,
Paul Lam

> 2020年8月24日 22:46,Aljoscha Krettek  写道:
> 
> Hi all,
> 
> this thought came up on FLINK-17260 [1] but I think it would be a good idea 
> in general. The issue reminded us that Kafka didn't have an 
> idempotent/fault-tolerant Producer before Kafka 0.11.0. By now we have had 
> the "modern" Kafka connector that roughly follows new Kafka releases for a 
> while and this one supports Kafka cluster versions as far back as 0.10.2.0 (I 
> believe).
> 
> What are your thoughts on removing support for older Kafka versions? And yes, 
> I know that we had multiple discussions like this in the past but I'm trying 
> to gauge the current sentiment.
> 
> I'm cross-posting to the user-ml since this is important for both users and 
> developers.
> 
> Best,
> Aljoscha
> 
> [1] https://issues.apache.org/jira/browse/FLINK-17260



Re: How to sink invalid data from flatmap

2020-08-24 Thread Yun Tang
Hi Chao

I think side output [1] might meet your requirements.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

Best
Yun Tang

From: 范超 
Sent: Tuesday, August 25, 2020 10:54
To: user 
Subject: How to sink invalid data from flatmap


Hi,

I’m using the custom flatmap function to validate the kafka json string 
message, if the kafka message is valid to transform to a pojo  (using GSON), 
then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the 
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
just catch this exception, and then go on, but this invalidated json message is 
just omitted.



But now, I want to save the invalidated json message to sink the original kafka 
json string to another table, but don’t know how to implement in my custom 
flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan




Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-24 Thread Xintong Song
按你的描述,我理解是在 yarn 上起了一个 flink session,然后往这个 session
里提交作业,一开始能正常提交运行,后来再作业开始出现问题?

具体问题的原因还是要看日志才能确定,这个有可能是 ResourceManager,也有可能是 HA 或者 Akka 的问题,仅凭现象描述很难判断。


Thank you~

Xintong Song



On Tue, Aug 25, 2020 at 10:23 AM song wang  wrote:

>
> 你好,这个报错是任务启动了很长一段时间后才发生的,之前一直正常运行,出现这个报错后就提交不了任务了,感觉是ResourceManager出现了问题,ResourceManager有可能自己死掉吗?
> 另外,这个现场还没有关掉,有什么办法排查下jobmaster和ResourceManager是否正常吗?
>
> Xintong Song  于2020年8月25日周二 上午9:46写道:
>
> > 从日志上看是 JobMaster 连不上 ResourceManager。这两个组件应该是在同一个进程内的,通过 akka 进行本地通信。
> > 需要看下完整的日志,RM 是否成功启动并注册到了 akka  的 actor system,以及注册的路径是否和 JM 尝试连接的路径一致。
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Aug 24, 2020 at 3:41 PM song wang 
> > wrote:
> >
> > > 各位老哥, flink
> > > 运行在yarn上,偶尔报错无法解析ResourceManager地址,可是从对应的host上查找是有flink进程的,请问是什么原因呢?
> > > flink 版本1.9.0
> > >
> > > 部分日志如下:
> > > ```
> > > 2020-08-24 15:11:31,566 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Could not resolve ResourceManager address
> > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> 1
> > > ms:
> > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> that
> > > the recipient actor didn't send a reply..
> > > 2020-08-24 15:11:51,606 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Could not resolve ResourceManager address
> > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> 1
> > > ms:
> > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> that
> > > the recipient actor didn't send a reply..
> > > 2020-08-24 15:12:11,645 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Could not resolve ResourceManager address
> > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> 1
> > > ms:
> > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> that
> > > the recipient actor didn't send a reply..
> > > 2020-08-24 15:12:31,687 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Could not resolve ResourceManager address
> > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> 1
> > > ms:
> > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> that
> > > the recipient actor didn't send a reply..
> > > 2020-08-24 15:12:51,727 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Could not resolve ResourceManager address
> > > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in
> 1
> > > ms:
> > > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is
> that
> > > the recipient actor didn't send a reply..
> > > 2020-08-24 15:13:08,198 INFO
> > >  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> > > Streaming WordCount (ff0ab7ec3e577a8e0c69e1c8454e5b72) switched from
> > state
> > > RUNNING to FAILING.
> > >
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > > Could not allocate all requires slots within timeout of 30 ms.
> Slots
> > > required: 9, slots allocated: 0, previous allocation IDs: [], execution
> > > status: completed exceptionally:
> > java.util.concurrent.CompletionException:
> > > java.util.concurrent.CompletionException:
> > >
> > >
> >
> java.util.concurrent.TimeoutException/java.util.concurrent.CompletableFuture@28d7ad5d
> > > [Completed
> > > exceptionally], incomplete:
> > java.util.concurrent.CompletableFuture@663cdf7e
> > > [Not
> > > completed, 1 dependents], incomplete:
> > > java.util.concurrent.CompletableFuture@2058a7e9[Not completed, 1
> > > dependents], incomplete:
> java.util.concurrent.CompletableFuture@5c1121c8
> > > [Not
> > > completed, 1 dependents], incomplete:
> > > java.util.concurrent.CompletableFuture@49b9c252[Not completed, 1
> > > dependents], incomplete:
> java.util.concurrent.CompletableFuture@497e3334
> > > [Not
> > > completed, 1 dependents], incomplete:
> > > java.util.concurrent.CompletableFuture@2c7ca21d[Not completed, 1
> > > dependents], incomplete:
> java.util.concurrent.CompletableFuture@7936c93b
> > > [Not
> > > completed, 1 dependents], incomplete:
> > > java.util.concurrent.CompletableFuture@7e9a2f1d[Not completed, 1
> > > dependents]
> > >at
> > >
> > >
> >
> 

How to sink invalid data from flatmap

2020-08-24 Thread 范超
Hi,
I’m using the custom flatmap function to validate the kafka json string 
message, if the kafka message is valid to transform to a pojo  (using GSON), 
then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the 
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I 
just catch this exception, and then go on, but this invalidated json message is 
just omitted.

But now, I want to save the invalidated json message to sink the original kafka 
json string to another table, but don’t know how to implement in my custom 
flatmap function, because the richmapfucntion has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan



Re: flink taskmanager 因为内存超了container限制 被yarn kill 问题定位

2020-08-24 Thread Xintong Song
首先,TaskManager 因为内存超用被杀,只能是 native 内存造成的。因为 flink 指定了 jvm 的 xmx 参数,heap
内存是不可能出现超用的,如果内存不足只会出现 OOM。所以你去排查 heap dump 这个方向是有问题的。

解决思路应该是调大 containerized.heap-cutoff-ratio,这个参数的含义是在 yarn container 中留出一定比例的
native 内存,让 jvm 只去使用剩余部分的内存。这些 native 主要用于:java native 方法调用、jvm
自身开销(类元数据、线程栈等)、rocksdb。flink 是无法控制这部分内存的用量的,只能是通过预留足够多的内存的方式,防止 container
超用。

Thank you~

Xintong Song



On Mon, Aug 24, 2020 at 8:56 PM 柯四海 <2693711...@qq.com> wrote:

> 我不是在做测试,公司flink是别人用的blink 分支编译的,我最近也有切换到 flink 1.11 来运行的打算, 我用flink 1.11
> 来试试.
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年8月24日(星期一) 晚上8:22
> 收件人:"user-zh"
> 主题:Re: flink taskmanager 因为内存超了container限制 被yarn kill 问题定位
>
>
>
> Hi
>  比较好奇你为什么在 Blink 分支做测试,而不是用最新的 1.11 做测试呢?
> Best,
> Congxian
>
>
> 柯四海 <2693711...@qq.com 于2020年8月24日周一 下午5:58写道:
>
>  Hi 大家好,
>  我用github上Blink分支(1.5)编译的flink来运行一些实时任务,发现Taskmanager
>  因为内存超了container限制被yarn kill.
>  有没有人有比较好的问题定位方案?
> 
>  尝试过但是还没有解决问题的方法:
>  1. 尝试增加taskmanager内存
>  修改: 从8G 提高到 36G,
> state back 从fileSystem 改为RocksDB.
> 
> 现象:taskmanager运行时间增加了好几个小时,但是还是因为内存超了被yarn kill.
>  2. dump taskmanager 堆栈,查看什么对象占用大量内存
>  操作: jmap -dump 
>  现象:
> 还没有dump结束,taskmanager就因为没有heartbeat 被主动kill.
>  (尝试过修改heartbeat时间,还是无果)
>  3. 借用官网debug方式,如下,但是没有dump出文件.
>  4. 设置containerized.heap-cutoff-ratio,希望触发 oom
> 从而产生dump文件,但是这个参数似乎不起作用.
> 


Re: flink 1.10 如何监控 Managed memory 使用量

2020-08-24 Thread Xintong Song
目前 flink webui 上 taskmanager metrics 的展示是有问题的,metrics 无法很好地对应到 taskmanager
内存模型中的各个部分。关于这一问题,社区也在讨论接下来的优化方案,可以参考一下 FLIP-112。

关于 managed memory,如你所说,是不在上述的 metrics 中的。

监控 managed memory 的用量其实不是很有必要:

   - 对于批处理,不管配置了多少 managed memory 都会被用掉。算子会根据 managed memory
   的大小决定申请多大的缓存,不会有内存浪费不用的情况。
   - 对于流处理,
  - 使用 rocksdb state backend 时,
 - rocksdb 默认也是根据 managed memory 的大小决定申请多大的缓存。
 - 此外,为了兼容此前版本的行为,rocksdb 也支持不根据 managed memory 决定内存大小。这种情况下,可以监控
 rocksdb 本身的 metrics 判断内存用量
  - 使用其他 state backend 的时候,不会用到 managed memory,应该配置为 0.


Thank you~

Xintong Song



On Mon, Aug 24, 2020 at 5:53 PM 莫失莫忘  wrote:

> flink 1.10 作业,监控内存的四个metric,Heap、NonHeap、Direct 和 Mapped,Managed
> memory的使用量在这四个中吗?如果在,是哪个?  如果不在,如何监控Managed memory 的使用量呢?
> ps:
> 看官方文档 Managed memory 不在上述四个中的任何一个
> 我自己做的任务监控,感觉 Managed memory也不在上诉四个中的一个
>
>
> 现在遇到的问题是: flink 内存监控,使用 sum =Heap+NonHeap+Direct+Mapped 作为已用内存,已用内存 占比
> 最大内存 很小时,提高内存作业吞吐量大幅上升。怀疑 该 sum 不包括 Managed memory。
>
> 内存使用如图所示: taskmanager.memory.process.size=8192mb
>
>
>
>


Re: flink on yarn默认GC的问题

2020-08-24 Thread shizk233
了解了,谢谢

Xintong Song  于2020年8月25日周二 上午9:59写道:

> taskmanager.sh 是 standalone 模式使用的启动脚本。docker 模式和老的 kubernetes session
> 模式本质上也可以看做是 standalone 模式,也会用到这些脚本。
> 而 yarn 和新的 native kubernetes 则都是由 client 向集群提交应用的时候指定启动命令,默认是没有指定的 GC
> collector 的。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 24, 2020 at 5:26 PM shizk233 
> wrote:
>
> > Hi all,
> >
> > 请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。
> > 在*默认设置*下,能观察到本地flink使用的G1,但on
> yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗?
> >
> > 我搜索了一些相关资料,但仍然没有搞清楚这是怎么回事,希望有了解的朋友帮忙解答下。感谢!
> >
> > 备注:我可以通过在flink-conf.yaml中设置env.java.opts: -XX:+UseG1GC来使flink on
> yarn也使用G1。
> >
>


flink-sql-gateway的安全与多租

2020-08-24 Thread 华小研
flink-sql-gateway当前只支持ssl这一种认证方式。 

  

1.我们希望拓展它的认证方式,jdbc客户端使用kerberos与flink-sql-gateway进行认证,flink-sql-gateway侧使用认证后对应的kerberos用户来提交任务,以进一步实现flink-sql-gateway的多租户能力。这个想法是否可行?
   

2.当前我们遇到的问题是基于netty+rest+kerberos认证的可参考范例太少了,spark的livy、hive的hive-server都是基于Servlet+kerberos,storm是基于netty+thrift+kerberos,
不基于Servlet的rest的kerberos认证该如何设计和编写,有没有人可以提供一些帮助?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-24 Thread song wang
你好,这个报错是任务启动了很长一段时间后才发生的,之前一直正常运行,出现这个报错后就提交不了任务了,感觉是ResourceManager出现了问题,ResourceManager有可能自己死掉吗?
另外,这个现场还没有关掉,有什么办法排查下jobmaster和ResourceManager是否正常吗?

Xintong Song  于2020年8月25日周二 上午9:46写道:

> 从日志上看是 JobMaster 连不上 ResourceManager。这两个组件应该是在同一个进程内的,通过 akka 进行本地通信。
> 需要看下完整的日志,RM 是否成功启动并注册到了 akka  的 actor system,以及注册的路径是否和 JM 尝试连接的路径一致。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Aug 24, 2020 at 3:41 PM song wang 
> wrote:
>
> > 各位老哥, flink
> > 运行在yarn上,偶尔报错无法解析ResourceManager地址,可是从对应的host上查找是有flink进程的,请问是什么原因呢?
> > flink 版本1.9.0
> >
> > 部分日志如下:
> > ```
> > 2020-08-24 15:11:31,566 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Could not resolve ResourceManager address
> > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> > ms:
> > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> > the recipient actor didn't send a reply..
> > 2020-08-24 15:11:51,606 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Could not resolve ResourceManager address
> > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> > ms:
> > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> > the recipient actor didn't send a reply..
> > 2020-08-24 15:12:11,645 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Could not resolve ResourceManager address
> > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> > ms:
> > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> > the recipient actor didn't send a reply..
> > 2020-08-24 15:12:31,687 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Could not resolve ResourceManager address
> > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> > ms:
> > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> > the recipient actor didn't send a reply..
> > 2020-08-24 15:12:51,727 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Could not resolve ResourceManager address
> > akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> > ms:
> > Ask timed out on [ActorSelection[Anchor(akka://flink/),
> > Path(/user/resourcemanager)]] after [1 ms]. Message of type
> > [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> > the recipient actor didn't send a reply..
> > 2020-08-24 15:13:08,198 INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> > Streaming WordCount (ff0ab7ec3e577a8e0c69e1c8454e5b72) switched from
> state
> > RUNNING to FAILING.
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Could not allocate all requires slots within timeout of 30 ms. Slots
> > required: 9, slots allocated: 0, previous allocation IDs: [], execution
> > status: completed exceptionally:
> java.util.concurrent.CompletionException:
> > java.util.concurrent.CompletionException:
> >
> >
> java.util.concurrent.TimeoutException/java.util.concurrent.CompletableFuture@28d7ad5d
> > [Completed
> > exceptionally], incomplete:
> java.util.concurrent.CompletableFuture@663cdf7e
> > [Not
> > completed, 1 dependents], incomplete:
> > java.util.concurrent.CompletableFuture@2058a7e9[Not completed, 1
> > dependents], incomplete: java.util.concurrent.CompletableFuture@5c1121c8
> > [Not
> > completed, 1 dependents], incomplete:
> > java.util.concurrent.CompletableFuture@49b9c252[Not completed, 1
> > dependents], incomplete: java.util.concurrent.CompletableFuture@497e3334
> > [Not
> > completed, 1 dependents], incomplete:
> > java.util.concurrent.CompletableFuture@2c7ca21d[Not completed, 1
> > dependents], incomplete: java.util.concurrent.CompletableFuture@7936c93b
> > [Not
> > completed, 1 dependents], incomplete:
> > java.util.concurrent.CompletableFuture@7e9a2f1d[Not completed, 1
> > dependents]
> >at
> >
> >
> org.apache.flink.runtime.executiongraph.SchedulingUtils.lambda$scheduleEager$1(SchedulingUtils.java:194)
> >at
> >
> >
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >at
> >
> >
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >at
> >
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >at
> >
> >
> 

Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 Thread Leonard Xu
Hello

> Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
> 1.底层是使用了debezium来加载历史数据的吗?
Flink支持两种CDC格式,debezium json和 canal json, debezium 和 
canal都是CDC系统,简单说他们可以把数据库表的binlog以对应的json写入到消息队列如Kafka,
作为下游系统的Flink 支持去消费对应的cdc数据,两个CDC工作都支持加载历史数据的。
另外Jark 在Veverica 开源了一个Flink CDC connector 
[1],支持利用debezuim直接读取数据库的cdc数据,不需要搭建CDC系统。

> 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset


Best
Leonard
[1] https://github.com/ververica/flink-cdc-connectors 


Re: flink on yarn默认GC的问题

2020-08-24 Thread Xintong Song
taskmanager.sh 是 standalone 模式使用的启动脚本。docker 模式和老的 kubernetes session
模式本质上也可以看做是 standalone 模式,也会用到这些脚本。
而 yarn 和新的 native kubernetes 则都是由 client 向集群提交应用的时候指定启动命令,默认是没有指定的 GC
collector 的。

Thank you~

Xintong Song



On Mon, Aug 24, 2020 at 5:26 PM shizk233 
wrote:

> Hi all,
>
> 请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。
> 在*默认设置*下,能观察到本地flink使用的G1,但on yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗?
>
> 我搜索了一些相关资料,但仍然没有搞清楚这是怎么回事,希望有了解的朋友帮忙解答下。感谢!
>
> 备注:我可以通过在flink-conf.yaml中设置env.java.opts: -XX:+UseG1GC来使flink on yarn也使用G1。
>


Re: flink1.11 sql api使用per job模式提交后,客户端退出

2020-08-24 Thread godfrey he
如果是通过TableEnvironment#execute方法提交需要设置execution.attached=true, 或者是通过flink
cli的 加上-d
如果是通过TableEnvironment#executeSql方法提交,需要代码里显示的等待作业结束:
TableResult tableResult = tEnv.executeSql(xxx);
// wait job finished
tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

lijufeng2016 <920347...@qq.com> 于2020年8月25日周二 上午9:34写道:

> flink1.11 sql api使用per
> job模式提交后,客户端退出,程序在yarn正常运行,客户端与yarn断开连接,与有没有办法不让客户端断开?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Setting job/task manager memory management in kubernetes

2020-08-24 Thread Yangze Guo
Hi,

You need to define them in "flink-configuration-configmap.yaml".
Please also make sure you've created the config map by executing
"kubectl create -f flink-configuration-configmap.yaml".

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html

Best,
Yangze Guo

On Mon, Aug 24, 2020 at 9:33 PM Sakshi Bansal  wrote:
>
> The flink version is 1.9 and it is a standalone k8s
>
> On Mon 24 Aug, 2020, 17:17 Yangze Guo,  wrote:
>>
>> Hi, Sakshi
>>
>> Could you provide more information about:
>> - What is the Flink version you are using? "taskmanager.heap.size" is
>> deprecated since 1.10[1].
>> - How do you deploy the cluster? In the approach of native k8s[2] or
>> the standalone k8s[3]?
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_migration.html
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/cluster_setup.html
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Aug 24, 2020 at 6:31 PM Sakshi Bansal  
>> wrote:
>> >
>> > Hello,
>> >
>> > I am trying to set the heap size of job and task manager when deploying 
>> > the job in kubernetes. I have set the jobmanager.heap.size and 
>> > taskmanager.heap.size. However, the custom values are not being used and 
>> > it is creating its own values and starting the job. How can I set custom 
>> > values?
>> >
>> > --
>> > Thanks and Regards
>> > Sakshi Bansal


Re: flink1.10以后,task堆外内存什么时候使用?

2020-08-24 Thread Xintong Song
应该是在用户代码使用堆外内存的时候指定

flink 框架的堆外内存是涵盖在 taskmanager.memory.framework.off-heap.size

Thank you~

Xintong Song



On Mon, Aug 24, 2020 at 4:27 PM caozhen  wrote:

> 如题,想问下大家task堆外内存设置规则(taskmanager.memory.task.off-heap.size)
>
> 1、是用户代码中指定了使用堆外内存吗?
> 2、还是flink框架中在某种情况下使用堆外内存?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-24 Thread Xintong Song
从日志上看是 JobMaster 连不上 ResourceManager。这两个组件应该是在同一个进程内的,通过 akka 进行本地通信。
需要看下完整的日志,RM 是否成功启动并注册到了 akka  的 actor system,以及注册的路径是否和 JM 尝试连接的路径一致。


Thank you~

Xintong Song



On Mon, Aug 24, 2020 at 3:41 PM song wang  wrote:

> 各位老哥, flink
> 运行在yarn上,偶尔报错无法解析ResourceManager地址,可是从对应的host上查找是有flink进程的,请问是什么原因呢?
> flink 版本1.9.0
>
> 部分日志如下:
> ```
> 2020-08-24 15:11:31,566 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Could not resolve ResourceManager address
> akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> ms:
> Ask timed out on [ActorSelection[Anchor(akka://flink/),
> Path(/user/resourcemanager)]] after [1 ms]. Message of type
> [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> the recipient actor didn't send a reply..
> 2020-08-24 15:11:51,606 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Could not resolve ResourceManager address
> akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> ms:
> Ask timed out on [ActorSelection[Anchor(akka://flink/),
> Path(/user/resourcemanager)]] after [1 ms]. Message of type
> [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> the recipient actor didn't send a reply..
> 2020-08-24 15:12:11,645 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Could not resolve ResourceManager address
> akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> ms:
> Ask timed out on [ActorSelection[Anchor(akka://flink/),
> Path(/user/resourcemanager)]] after [1 ms]. Message of type
> [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> the recipient actor didn't send a reply..
> 2020-08-24 15:12:31,687 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Could not resolve ResourceManager address
> akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> ms:
> Ask timed out on [ActorSelection[Anchor(akka://flink/),
> Path(/user/resourcemanager)]] after [1 ms]. Message of type
> [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> the recipient actor didn't send a reply..
> 2020-08-24 15:12:51,727 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Could not resolve ResourceManager address
> akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1
> ms:
> Ask timed out on [ActorSelection[Anchor(akka://flink/),
> Path(/user/resourcemanager)]] after [1 ms]. Message of type
> [akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
> the recipient actor didn't send a reply..
> 2020-08-24 15:13:08,198 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> Streaming WordCount (ff0ab7ec3e577a8e0c69e1c8454e5b72) switched from state
> RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 9, slots allocated: 0, previous allocation IDs: [], execution
> status: completed exceptionally: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
>
> java.util.concurrent.TimeoutException/java.util.concurrent.CompletableFuture@28d7ad5d
> [Completed
> exceptionally], incomplete: java.util.concurrent.CompletableFuture@663cdf7e
> [Not
> completed, 1 dependents], incomplete:
> java.util.concurrent.CompletableFuture@2058a7e9[Not completed, 1
> dependents], incomplete: java.util.concurrent.CompletableFuture@5c1121c8
> [Not
> completed, 1 dependents], incomplete:
> java.util.concurrent.CompletableFuture@49b9c252[Not completed, 1
> dependents], incomplete: java.util.concurrent.CompletableFuture@497e3334
> [Not
> completed, 1 dependents], incomplete:
> java.util.concurrent.CompletableFuture@2c7ca21d[Not completed, 1
> dependents], incomplete: java.util.concurrent.CompletableFuture@7936c93b
> [Not
> completed, 1 dependents], incomplete:
> java.util.concurrent.CompletableFuture@7e9a2f1d[Not completed, 1
> dependents]
>at
>
> org.apache.flink.runtime.executiongraph.SchedulingUtils.lambda$scheduleEager$1(SchedulingUtils.java:194)
>at
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>at
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>at
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>at
>
> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
>at
>
> org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:656)
>at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>at
>
> 

flink1.11 sql api使用per job模式提交后,客户端退出

2020-08-24 Thread lijufeng2016
flink1.11 sql api使用per job模式提交后,客户端退出,程序在yarn正常运行,客户端与yarn断开连接,与有没有办法不让客户端断开?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 sql api使用per job模式提交后,客户端退出

2020-08-24 Thread lijufeng2016
flink1.11 sql api使用per job模式提交后,客户端退出,程序在yarn正常运行,客户端与yarn断开连接,与有没有办法不让客户端断开?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink kafka 分区问题

2020-08-24 Thread steven chen
大佬们,有个问题一直不理解
1.FlinkKafkaProducer 往 kafka 中写数据时 kafka 有10分区,现在使用 round-robin 的方式进行分区,每个 task 
都会轮循的写下游的所有 partition
为什么10分区的产生的偏移量会出现偏差。
2.多流join 数据偏移,如何修复
3.flink sql 能否repartition

Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 Thread xiao cai
Hi
确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。


Best,
xiao cai


 原始邮件 
发件人: Congxian Qiu
收件人: user-zh
发送时间: 2020年8月24日(周一) 20:39
主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, 
Congxian xiao cai  于2020年8月20日周四 下午2:27写道: > Hi: > 
感谢答复,确实是个思路。 > > 
不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。
 > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超 > 收件人: 
user-zh@flink.apache.org > 发送时间: 2020年8月20日(周四) 09:11 
> 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 
我之前开启job的failover > 
restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
 > executor No TaskExecutor registered under containe_. > 
我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai > 
[mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < > 
user-zh@flink.apache.org> 主题: Flink on Yarn > 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn > 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink > 
任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
 > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. > 
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - > 
Received 1 containers with resource , 1 pending > 
container requests. 2020-08-19 11:23:08,100 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor > 
container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 > with 
TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > 
taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > 
networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > 
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > 
jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - Creating container launch > 
context for TaskManagers 2020-08-19 11:23:08,101 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers > 
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > 
Removing container request Capability[]Priority[1]. > 
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > 
Accepted 1 requested containers, returned 0 excess containers, 0 pending > 
container requests of resource . 2020-08-19 > 
11:23:08,102 INFO > 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > 
Processing Event EventType: START_CONTAINER for Container > 
container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR > 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > 
[] - Unhandled exception. > 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 > No TaskExecutor registered under > 
container_e07_1596440446172_0094_01_68. at > 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at > 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) > 
~[?:1.8.0_191] at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 

Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 Thread taochanglian

应该是。通过源码可以知道flink-json目前支持2种内置json格式的解析,一个是canal,一个是debezium。

具体可参考: 
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema 和 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema


在 2020/8/24 17:27, dixingxin...@163.com 写道:

Hi all:
Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
1.底层是使用了debezium来加载历史数据的吗?
2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?

希望能帮忙解答一下,谢谢。


Best,
Xingxing Di




Re: Flink checkpointing with Azure block storage

2020-08-24 Thread Boris Lublinsky
Thanks Plyush,
The thing that I was missing is this.
Now it all works


> On Aug 24, 2020, at 2:44 PM, Piyush Narang  wrote:
> 
> We had something like this when we were setting it in our code (now we’re 
> passing it via config). There’s likely a better /cleaner way:
> private def configureCheckpoints(env: StreamExecutionEnvironment,
>  checkpointPath: String): Unit = {
>   if (checkpointPath.startsWith("wasb")) {
> import org.apache.hadoop.fs.{Path => HPath}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.core.fs.FileSystem
> 
> val jobCheckpointsPath = new HPath(checkpointPath)
> val conf = new Configuration()
> conf.setString(
>   "fs.azure.account.key.storage-account.blob.core.windows.net",
>   "access-key"
> )
> FileSystem.initialize(conf) // this ensures the AzureFS is initialized 
> and with correct creds
>   }
>   // other checkpoint config stuff
> }
>  
> -- Piyush
>  
>  
> From: Boris Lublinsky 
> Date: Saturday, August 22, 2020 at 10:08 PM
> To: Yun Tang 
> Cc: user 
> Subject: Re: Flink checkpointing with Azure block storage
>  
> Thanks Yun,
> I make it work, but now I want to set appropriate config programmatically.
> I can set state.checkpointing.dir by:
>  
> val fsStateBackend = new FsStateBackend(new 
> URI("wasb://@$.blob.core.windows.net 
> /"))
> env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])
>  
> But, I can’t update configuration to add credentials 
> fs.azure.account.key..blob.core.windows.net 
> : 
> Because getConfiguration is a private method. Any suggestions?
> 
> 
> 
> 
>  
> 
> 
> On Aug 20, 2020, at 9:29 PM, Yun Tang  > wrote:
>  
> Hi Boris
>  
> I think the official guide [1] should be enough to tell you how to configure.
> However, I think your changes to flink-conf.ymal might not take effect as you 
> have configured the state backend as 'filesystem' while logs still tell us 
> that "No state backend has been configured, using default (Memory / 
> JobManager) MemoryStateBackend".
>  
> You can view the log to see whether your changes printed to search for 
> "Loading configuration property".
>  
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration
>  
> 
>  
> Best
> Yun Tang
>  
> From: Boris Lublinsky  >
> Sent: Friday, August 21, 2020 7:18
> To: user mailto:user@flink.apache.org>>
> Subject: Re: Flink checkpointing with Azure block storage
>  
> To test it, I created flink-conf.yaml file and put it in resource directory 
> of my project
> The file contains the following:
>  
> #==
> # Fault tolerance and checkpointing
> #==
> 
> # The backend that will be used to store operator state checkpoints if
> # checkpointing is enabled.
> #
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # .
> #
> state.backend: filesystem
> 
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends.
> #
> state.checkpoints.dir: 
> wasb://@$.blob.core.windows.net 
> /
> 
> fs.azure.account.key..blob.core.windows.net 
> : 
> 
> # Default target directory for savepoints, optional.
> #
> # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
> 
> # Flag to enable/disable incremental checkpoints for backends that
>  
> Which should of produce error,
>  
> But what I see is that it does not seen to take effect:
>  
>  
> 313 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been 
> configured, using default (Memory / JobManager) MemoryStateBackend (data in 
> heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
> 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 3327 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb
>  for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
> 3329 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService
>   - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3
>  
> 
> 
> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky  > wrote:
>  
> Is there somewhere a complete configuration example for such option?
>  



Re: Flink Couchbase

2020-08-24 Thread Vijayendra Yadav
Actually got this message in rolledover container logs:

[org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "cb-timer-1-1" java.lang.OutOfMemoryError: Metaspace
Exception in thread "Thread-16" java.lang.OutOfMemoryError: Metaspace
Exception in thread "TransientBlobCache shutdown hook"
java.lang.OutOfMemoryError: Metaspace
Exception in thread "FileChannelManagerImpl-io shutdown hook"
java.lang.OutOfMemoryError: Metaspace
Exception in thread "Kafka Fetcher for Source: flink-kafka-consumer ->
Map -> Filter -> Map -> Sink: s3-sink-raw (2/3)"
java.lang.OutOfMemoryError: Metaspace
Exception in thread "FileCache shutdown hook"
java.lang.OutOfMemoryError: Metaspace

Any suggestions on how to fix it ?



On Mon, Aug 24, 2020 at 12:53 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
> Running a flink job on Yarn, I am trying to make connections to
> couchbase DB in one of my map functions in Flink Streaming job. But my task
> manager containers keep failing
> and keep assigning new containers and not giving me an opportunity to get
> any useful logs.
>
>  val cluster = Cluster.connect("host", "user", "pwd")
>  val bucket = cluster.bucket("bucket")
>  val collection = bucket.defaultCollection
>
> Only thing I see is yarn exception:
>
> java.lang.Exception: Container released on a *lost* node
> at org.apache.flink.yarn.YarnResourceManager
> .lambda$onContainersCompleted$0(YarnResourceManager.java:343)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
> AkkaRpcActor.java:397)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
> AkkaRpcActor.java:190)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
> .handleRpcMessage(FencedAkkaRpcActor.java:74)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
> AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> .java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
> 1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
>
>
> Could you please provide any insight on how to get logs. And why a simple
> connection will not work.
>
> Note: it works in my local system yarn.
>
> Regards,
> Vijay
>


Flink Couchbase

2020-08-24 Thread Vijayendra Yadav
Hi Team,

Running a flink job on Yarn, I am trying to make connections to
couchbase DB in one of my map functions in Flink Streaming job. But my task
manager containers keep failing
and keep assigning new containers and not giving me an opportunity to get
any useful logs.

 val cluster = Cluster.connect("host", "user", "pwd")
 val bucket = cluster.bucket("bucket")
 val collection = bucket.defaultCollection

Only thing I see is yarn exception:

java.lang.Exception: Container released on a *lost* node
at org.apache.flink.yarn.YarnResourceManager
.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:190)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)



Could you please provide any insight on how to get logs. And why a simple
connection will not work.

Note: it works in my local system yarn.

Regards,
Vijay


Re: Flink checkpointing with Azure block storage

2020-08-24 Thread Piyush Narang
We had something like this when we were setting it in our code (now we’re 
passing it via config). There’s likely a better /cleaner way:
private def configureCheckpoints(env: StreamExecutionEnvironment,
 checkpointPath: String): Unit = {
  if (checkpointPath.startsWith("wasb")) {
import org.apache.hadoop.fs.{Path => HPath}
import org.apache.flink.configuration.Configuration

import org.apache.flink.core.fs.FileSystem

val jobCheckpointsPath = new HPath(checkpointPath)
val conf = new Configuration()
conf.setString(
  "fs.azure.account.key.storage-account.blob.core.windows.net",
  "access-key"
)
FileSystem.initialize(conf) // this ensures the AzureFS is initialized and 
with correct creds
  }
  // other checkpoint config stuff
}

-- Piyush


From: Boris Lublinsky 
Date: Saturday, August 22, 2020 at 10:08 PM
To: Yun Tang 
Cc: user 
Subject: Re: Flink checkpointing with Azure block storage

Thanks Yun,
I make it work, but now I want to set appropriate config programmatically.
I can set state.checkpointing.dir by:


val fsStateBackend = new FsStateBackend(new 
URI("wasb://@$.blob.core.windows.net/"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

But, I can’t update configuration to add credentials 
fs.azure.account.key..blob.core.windows.net:
 
Because getConfiguration is a private method. Any suggestions?







On Aug 20, 2020, at 9:29 PM, Yun Tang 
mailto:myas...@live.com>> wrote:

Hi Boris

I think the official guide [1] should be enough to tell you how to configure.
However, I think your changes to flink-conf.ymal might not take effect as you 
have configured the state backend as 'filesystem' while logs still tell us that 
"No state backend has been configured, using default (Memory / JobManager) 
MemoryStateBackend".

You can view the log to see whether your changes printed to search for "Loading 
configuration property".

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration

Best
Yun Tang


From: Boris Lublinsky 
mailto:boris.lublin...@lightbend.com>>
Sent: Friday, August 21, 2020 7:18
To: user mailto:user@flink.apache.org>>
Subject: Re: Flink checkpointing with Azure block storage

To test it, I created flink-conf.yaml file and put it in resource directory of 
my project
The file contains the following:


#==
# Fault tolerance and checkpointing
#==

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: 
wasb://@$.blob.core.windows.net/

fs.azure.account.key..blob.core.windows.net:
 

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been 
configured, using default (Memory / JobManager) MemoryStateBackend (data in 
heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 
'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb
 for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  
- Proposing leadership to contender akka://flink/user/rpc/jobmanager_3



On Aug 20, 2020, at 5:14 PM, Boris Lublinsky 
mailto:boris.lublin...@lightbend.com>> wrote:

Is there somewhere a complete configuration example for such option?



Re: Debezium Flink EMR

2020-08-24 Thread Rex Fenley
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira 
wrote:

> Yes — you'll get the full row in the payload; and you can also access the
> change operation, which might be useful in your case.
>
> About performance, I'm summoning Kurt and @Jark Wu  to
> the thread, who will be able to give you a more complete answer and likely
> also some optimization tips for your specific use case.
>
> Marta
>
> On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley  wrote:
>
>> Yup! This definitely helps and makes sense.
>>
>> The 'after' payload comes with all data from the row right? So
>> essentially inserts and updates I can insert/replace data by pk and null
>> values I just delete by pk, and then I can build out the rest of my joins
>> like normal.
>>
>> Are there any performance implications of doing it this way that is
>> different from the out-of-the-box 1.11 solution?
>>
>> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira 
>> wrote:
>>
>>> Hi, Rex.
>>>
>>> Part of what enabled CDC support in Flink 1.11 was the refactoring of
>>> the table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>>> [2], which allows to emit bounded/unbounded streams with insert, update and
>>> delete rows.
>>>
>>> In theory, you could consume data generated with Debezium as regular
>>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>>> to really treat it as "changelog". As a workaround, what you can do in
>>> Flink 1.10 is process these messages as JSON and extract the "after" field
>>> from the payload, and then apply de-duplication [3] to keep only the last
>>> row.
>>>
>>> The DDL for your source table would look something like:
>>>
>>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
>>> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );
>>> Hope this helps!
>>>
>>> Marta
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>>
>>>
>>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler 
>>> wrote:
>>>
 @Jark Would it be possible to use the 1.11 debezium support in 1.10?

 On 20/08/2020 19:59, Rex Fenley wrote:

 Hi,

 I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
 however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
 in Flink 1.11.0, from looking at the documentation.

 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html

 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

 I'm wondering what alternative solutions are available for connecting
 Debezium to Flink? Is there an open source Debezium connector that works
 with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
 Debezium connector and compile it in my project using Flink 1.10.0 api?

 For context, I plan on doing some fairly complicated long lived
 stateful joins / materialization using the Table API over data ingested
 from Postgres and possibly MySQL.

 Appreciate any help, thanks!

 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US
 



>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



回复:流处理任务中checkpoint失败

2020-08-24 Thread Robert.Zhang
看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
官方文档对于在iterative 
stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的

---原始邮件---
发件人: "Congxian Qiu"https://zhuanlan.zhihu.com/p/87131964
Best,
Congxian


Robert.Zhang <173603...@qq.com 于2020年8月21日周五 下午6:31写道:

 Hello all,
 目前遇到一个问题,在iterative stream job
 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
 Exceeded checkpoint tolerable failure threshold.的报错


 配置如下:
 env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
 CheckpointConfig checkpointConfig = env.getCheckpointConfig();
 checkpointConfig.setCheckpointTimeout(60);
 checkpointConfig.setMinPauseBetweenCheckpoints(6);
 checkpointConfig.setMaxConcurrentCheckpoints(4);

 
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 checkpointConfig.setPreferCheckpointForRecovery(true);
 checkpointConfig.setTolerableCheckpointFailureNumber(2);
 checkpointConfig.enableUnalignedCheckpoints();


 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?

Re: How jobmanager and task manager communicates with each other ?

2020-08-24 Thread sidhant gupta
++d...@flink.apache.org

On Mon, Aug 24, 2020, 7:31 PM sidhant gupta  wrote:

> Hi User
>
> How jobmanager and task manager communicates with each other ? How to set
> connection between jobmanager and task manager running in different/same
> ec2 instance ? Is it http or tcp ? How the service discovery works ?
>
> Thanks
> Sidhant Gupta
>


Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-24 Thread Vijayendra Yadav
Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY)
on every checkpoint.

*.withRollingPolicy(OnCheckpointRollingPolicy.build())*

Question: What are recommended values related to checkpointing to fsstate,
should it be more frequent checkpoints, or longer intervals, how many
concurrent checkpoints needs to be allowed, how much should be an ideal
pause between each checkpoint.
Is there a way to control batch size here other than time ? any
recommendations to all the parameters listed below?
*Note: *I am trying to improve sink throughput.


env.enableCheckpointing(chckptintervalmilli)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay


[DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-24 Thread Aljoscha Krettek

Hi all,

this thought came up on FLINK-17260 [1] but I think it would be a good 
idea in general. The issue reminded us that Kafka didn't have an 
idempotent/fault-tolerant Producer before Kafka 0.11.0. By now we have 
had the "modern" Kafka connector that roughly follows new Kafka releases 
for a while and this one supports Kafka cluster versions as far back as 
0.10.2.0 (I believe).


What are your thoughts on removing support for older Kafka versions? And 
yes, I know that we had multiple discussions like this in the past but 
I'm trying to gauge the current sentiment.


I'm cross-posting to the user-ml since this is important for both users 
and developers.


Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-17260


How jobmanager and task manager communicates with each other ?

2020-08-24 Thread sidhant gupta
Hi User

How jobmanager and task manager communicates with each other ? How to set
connection between jobmanager and task manager running in different/same
ec2 instance ? Is it http or tcp ? How the service discovery works ?

Thanks
Sidhant Gupta


Idle stream does not advance watermark in connected stream

2020-08-24 Thread Truong Duc Kien
Hi all,
We are testing the new Idleness detection feature in Flink 1.11, however,
it does not work as we expected:
When we connect two data streams, of which one is idle, the output
watermark CoProcessOperator does not increase, hence the program cannot
progress.

I've made a small project to illustrate the problem. The watermark received
by the sink does not increase at all until the idle source is stopped.

https://github.com/kien-truong/flink-idleness-testing

Is this a bug or does the idleness detection not support this use case ?

Regards.
Kien


Re: flink-sql-gateway还会更新吗

2020-08-24 Thread godfrey he
我们会在这周让flink-sql-gateway支持1.11,请关注
另外,sql-client支持gateway模式,据我所知目前还没计划。

shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:

> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink log4j2 问题

2020-08-24 Thread godfrey he
检查一下log4j2相关的版本,参考[1]

[1]
https://stackoverflow.com/questions/50970960/facing-issue-with-log4j2-java-lang-exceptionininitializererror

guaishushu1...@163.com  于2020年8月24日周一 上午11:18写道:

> SQL提交会出现这种问题???
> Caused by: java.lang.IllegalArgumentException: Initial capacity must be at
> least one but was 0
> at
> org.apache.logging.log4j.util.SortedArrayStringMap.(SortedArrayStringMap.java:102)
> at
> org.apache.logging.log4j.core.impl.ContextDataFactory.createContextData(ContextDataFactory.java:109)
> at
> org.apache.logging.log4j.core.impl.ContextDataFactory.(ContextDataFactory.java:57)
> ... 29 more
>
>
>
> guaishushu1...@163.com
>


Re: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread Rui Li
Hi,

hive-exec本身并不包含Hadoop,如果是因为maven的传递依赖引入的话可以在打包时去掉。运行时使用的Hadoop版本可以用你集群Hadoop版本,而不是hive本身依赖的Hadoop版本。另外对于Flink
1.11也可以考虑使用官方提供的flink-sql-connector-hive Uber
jar,这个jar包含所有hive的依赖(Hadoop的依赖还是需要另外添加)。更详细的信息建议参考文档 [1][2]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes

On Mon, Aug 24, 2020 at 9:05 PM amen...@163.com  wrote:

>
> 补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
> 现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
> at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
> ... 19 more
> Caused by: java.lang.ClassCastException:
> org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto
> cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
> at
> org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
> ... 24 more
>
> best,
> amenhub
>
> 发件人: amen...@163.com
> 发送时间: 2020-08-24 20:40
> 收件人: user-zh
> 主题: hive-exec依赖导致hadoop冲突问题
> hi, everyone
>
> 组件版本:flink-1.11.1,hive-2.1.1
>
> 问题描述:
> 使用Table
> API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
>
> 当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive
> table(不会发生hadoop依赖冲突);
>
> 但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
>
>
> 请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
>
> best,
> amenhub
>


-- 
Best regards!
Rui Li


回复: hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
补充一下,当我移除hive-exec等程序中的hadoop依赖时,任务依旧异常,所以也许是我哪个地方没有到位,觉得依赖冲突是因为在测试hive集成之前,我提交过到yarn执行并无异常,所以排查思路来到了hive这里,
现在看来,可能是另外某个原因导致的,贴一点点异常栈如下:

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
... 19 more
Caused by: java.lang.ClassCastException: 
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetClusterNodesRequestProto 
cannot be cast to org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy63.getClusterNodes(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:311)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy64.getClusterNodes(Unknown Source)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:618)
at 
org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever.getMaxVcores(YarnClientYarnClusterInformationRetriever.java:43)
at 
org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:280)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
... 24 more

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-08-24 20:40
收件人: user-zh
主题: hive-exec依赖导致hadoop冲突问题
hi, everyone
 
组件版本:flink-1.11.1,hive-2.1.1
 
问题描述:
使用Table 
API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;
 
当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive 
table(不会发生hadoop依赖冲突);
但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;
 
请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?
 
best,
amenhub


?????? flink taskmanager ????????????container???? ??yarn kill ????????

2020-08-24 Thread ??????
flink??blink  flink 
1.11 , flink 1.11 ??.




----
??: 
   "user-zh"



Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-24 Thread Congxian Qiu
Hi
   这个文件不存在的话,应该是这次 checkpoint 没有成功完成,这样从这次 checkpoint 恢复的时候是会失败的。现在社区暂时只支持
stop with savepoint,如果想从 checkpoint 恢复的话,只能够从之前生成的 checkpoint 恢复,如果
checkpoint 生成了有一段时间之后,重放的数据会有些多,之前社区有一个 issue FLINK-12619 尝试做 stop with
checkpoint(这样能够减少重放的数据),如果有需求的话,可以在 issue 上评论
Best,
Congxian


Yang Peng  于2020年8月19日周三 下午3:03写道:

>
> 感谢邱老师,这个我查看了一下没有这个文件的,跟现在运行的相同任务的正常执行的chk目录下的文件相比这个chk-167目录下的文件数少了很多,我们当时是看着cp执行完成之后cancel了任务然后
> 从hdfs上查到这个目录cp路径去重启的任务
>
> Congxian Qiu  于2020年8月19日周三 下午2:39写道:
>
> > Hi
> >1 图挂了
> > 2 你到 hdfs 上能找到 hdfs:*xx*/flink/checkpoints/
> > 7226f43179649162e6bae2573a952e60/chk-167/_metadata 这个文件吗?
> > Best,
> > Congxian
> >
> >
> > Yang Peng  于2020年8月17日周一 下午5:47写道:
> >
> > > 找到了 具体日志如下:2020-08-13 19:45:21,932 ERROR
> > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> > > occurred in the cluster entrypoint.
> > >
> > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
> > leadership with session id 98a2a688-266b-4929-9442-1f0b559ade43.
> > >   at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
> > >   at
> >
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> > >   at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> > >   at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > >   at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > >   at
> >
> org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:691)
> > >   at
> >
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> > >   at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> > >   at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > >   at
> >
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> > >   at
> >
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> > >   at
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > >   at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> > >   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > >   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > >   at
> > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > >   at akka.japi.pf
> > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > >   at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > >   at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > >   at
> > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > >   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > >   at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > >   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > >   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > >   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > >   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > >   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > >   at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >   at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > >   at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >   at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > Caused by: java.lang.RuntimeException:
> > org.apache.flink.runtime.client.JobExecutionException: Could not set up
> > JobManager
> > >   at
> >
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> > >   at
> >
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > >   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > >   at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > >   ... 4 more
> > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> > not set up JobManager
> > >   at
> >
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:152)
> > >   at
> >
> 

hive-exec依赖导致hadoop冲突问题

2020-08-24 Thread amen...@163.com
hi, everyone

组件版本:flink-1.11.1,hive-2.1.1

问题描述:
使用Table 
API调用executeSql()方法编写kafka2mysql实时程序demo,在未导入hive-exec依赖时,打包提交到yarn集群,正常运行;

当测试HiveCatalog及读写Hive Table时,Standalone Cluster运行无异常,在flink端正常读写hive 
table(不会发生hadoop依赖冲突);
但当提交到yarn时发生hadoop冲突,通过IDEA查看程序依赖得知,当引入hive-exec依赖时,会自动的带入hadoop和hdfs相关的版本为2.6.1的依赖包,从而导致和yarn集群(hadoop-3.0.0-cdh-6.2.0)的hadoop等依赖包冲突;

请问社区有碰到这种情况吗?doc中建议没有官方指定的hive包时选择自有版本下载hive-exec依赖,这种情况下却隐式的引入了非集群版本的hadoop依赖,势必会造成冲突,这是我这边哪里设置的不到位吗?

best,
amenhub


Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 Thread Congxian Qiu
Hi
   理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug
Best,
Congxian


xiao cai  于2020年8月20日周四 下午2:27写道:

> Hi:
> 感谢答复,确实是个思路。
>
> 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。
>
>
> Best,
> xiao cai
>
>
>  原始邮件
> 发件人: 范超
> 收件人: user-zh@flink.apache.org
> 发送时间: 2020年8月20日(周四) 09:11
> 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
>
>
> 我之前开启job的failover
> restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
> executor No TaskExecutor registered under containe_.
> 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai
> [mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh <
> user-zh@flink.apache.org> 主题: Flink on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink
> 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
> Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers.
> 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Received 1 containers with resource , 1 pending
> container requests. 2020-08-19 11:23:08,100 INFO
> org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor
> container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22
> with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb
> (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Creating container launch
> context for TaskManagers 2020-08-19 11:23:08,101 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Removing container request Capability[]Priority[1].
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Accepted 1 requested containers, returned 0 excess containers, 0 pending
> container requests of resource . 2020-08-19
> 11:23:08,102 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] -
> Processing Event EventType: START_CONTAINER for Container
> container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under
> container_e07_1596440446172_0094_01_68. at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_191] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> 

Re: 增量che ckpoint

2020-08-24 Thread Congxian Qiu
Hi
  分享一篇讲解增量 checkpoint 的文章[1]

[1]
https://ververica.cn/developers/manage-large-state-incremental-checkpoint/
Best,
Congxian


Yun Tang  于2020年8月21日周五 上午12:09写道:

> Hi
>
> 增量checkpoint与web界面的信息其实没有直接联系,增量checkpoint的信息记录由CheckpointCoordinator中的SharedStateRegistry[1]
> 进行计数管理,而保留多少checkpoint则由 CheckpointStore管理 [2].
> 保留2个checkpoint的执行过程如下:
> chk-1 completed --> register chk-1 in state registry --> add to checkpoint
> store
> chk-2 completed --> register chk-2 in state registry --> add to checkpoint
> store
> chk-3 completed --> register chk-3 in state registry --> add to checkpoint
> store --> chk-1 subsumed --> unregister chk-1 in state registry --> discard
> state with reference=0
> chk-4 completed --> register chk-4 in state registry --> add to checkpoint
> store --> chk-2 subsumed --> unregister chk-2 in state registry --> discard
> state with reference=0
>
> 从上面可以看懂整个执行流程,所以当chk-3
> 仍然有部分数据依赖chk-1时,那些state数据在unregister时,其计数统计并不会降为0,也就不会删掉,也不需要copy到本次中。
>
>
> [1]
> https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L192
> [2]
> https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java#L41
>
> 祝好
> 唐云
>
>
> 
> From: 赵一旦 
> Sent: Thursday, August 20, 2020 10:50
> To: user-zh@flink.apache.org 
> Subject: Re: 增量che ckpoint
>
> 等其他人正解。下面是我的猜测:
> 保留2个检查点是web界面保留2个检查点,增量情况下,这2个检查点所有引用到的所有历史检查点肯定都不会被删除。
> 因此第3个检查点的时候,只有2,3检查点仍然引用了1,则1就不会被删除。
>
> superainbower  于2020年8月20日周四 上午10:46写道:
>
> > hi,请教大家一个问题,开启了增量checkpoint,同时checkpoint的个数设置为只保留2个,那么如果当前是第三次checkpoint
> > 仍然依赖第一次的checkpoint会出现什么情况,会把第一次的copy过来到本次中吗?如过第一次不删除,不是会不满足保留2个的限制吗
>


Re: 关于flink 读取 jdbc报错详情,序列化报错

2020-08-24 Thread Congxian Qiu
Hi
   从报错看 CountDownLatch 这个方法无法 serializable,这个 class 没有实现  Serializable
接口。你可以按照这里的方法[1] 尝试解决下

[1]
https://stackoverflow.com/questions/4551926/java-io-notserializableexception-while-writing-serializable-object-to-external-s/4552014
Best,
Congxian


引领  于2020年8月24日周一 下午3:34写道:

>
> 使用场景:FLink 1.11.1
> 读取mysql,一直报序列化错误,但感觉需要序列化的bean对象已经序列化,实在百思不得其解,前来求教各位大佬!!! 代码再附件!!!
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException:
> java.util.concurrent.CountDownLatch@45ca843[Count = 2] is not
> serializable. The object probably contains or references non serializable
> fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1571)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1553)
> at com.hsq.APP.main(APP.java:43)
> Caused by: java.io.NotSerializableException:
> java.util.concurrent.CountDownLatch
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
> ... 9 more
>
> 引领
> yrx73...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


Re: flink taskmanager 因为内存超了container限制 被yarn kill 问题定位

2020-08-24 Thread Congxian Qiu
Hi
   比较好奇你为什么在 Blink 分支做测试,而不是用最新的 1.11 做测试呢?
Best,
Congxian


柯四海 <2693711...@qq.com> 于2020年8月24日周一 下午5:58写道:

> Hi 大家好,
> 我用github上Blink分支(1.5)编译的flink来运行一些实时任务,发现Taskmanager
> 因为内存超了container限制被yarn kill.
> 有没有人有比较好的问题定位方案?
>
> 尝试过但是还没有解决问题的方法:
>   1. 尝试增加taskmanager内存
> 修改: 从8G 提高到 36G, state back  从fileSystem 改为RocksDB.
> 现象:taskmanager运行时间增加了好几个小时,但是还是因为内存超了被yarn kill.
>   2. dump taskmanager 堆栈,查看什么对象占用大量内存
>操作: jmap -dump 
>现象: 还没有dump结束,taskmanager就因为没有heartbeat 被主动kill.
> (尝试过修改heartbeat时间,还是无果)
>   3. 借用官网debug方式,如下,但是没有dump出文件.
>4. 设置containerized.heap-cutoff-ratio,希望触发 oom 从而产生dump文件,但是这个参数似乎不起作用.
>


Re: 流处理任务中checkpoint失败

2020-08-24 Thread Congxian Qiu
Hi
   从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 checkpoint
有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
   另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。

[1] https://zhuanlan.zhihu.com/p/87131964
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年8月21日周五 下午6:31写道:

> Hello all,
> 目前遇到一个问题,在iterative stream job
> 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> Exceeded checkpoint tolerable failure threshold.的报错
>
>
> 配置如下:
> env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.setCheckpointTimeout(60);
> checkpointConfig.setMinPauseBetweenCheckpoints(6);
> checkpointConfig.setMaxConcurrentCheckpoints(4);
>
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setPreferCheckpointForRecovery(true);
> checkpointConfig.setTolerableCheckpointFailureNumber(2);
> checkpointConfig.enableUnalignedCheckpoints();
>
>
> 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?


Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-24 Thread Congxian Qiu
Hi
   StateBackend 可以理解为 一个 KV 存储加上一个 snapshot 过程,其中 snapshot 过程负责将当前 KV
存储的数据进行备份。理论上任何的 KV 存储都是有可能作为 StateBackend 的,不过增加一种 StateBackend 的话,需要实现相应的
snapshot/restore 逻辑。

   但是在多个 Flink 作业中实现共享的 state 这个在 Flink 中是不支持的。
Best,
Congxian


wxpcc  于2020年8月21日周五 下午6:33写道:

> 项目里有部分需要进行状态共享的需求,多个flink 任务之间
>
> 如题,tikv本身基于rocksdb 是否有可能扩展成为分布式 backend
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Setting job/task manager memory management in kubernetes

2020-08-24 Thread Yangze Guo
Hi, Sakshi

Could you provide more information about:
- What is the Flink version you are using? "taskmanager.heap.size" is
deprecated since 1.10[1].
- How do you deploy the cluster? In the approach of native k8s[2] or
the standalone k8s[3]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_migration.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/cluster_setup.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html

Best,
Yangze Guo

On Mon, Aug 24, 2020 at 6:31 PM Sakshi Bansal  wrote:
>
> Hello,
>
> I am trying to set the heap size of job and task manager when deploying the 
> job in kubernetes. I have set the jobmanager.heap.size and 
> taskmanager.heap.size. However, the custom values are not being used and it 
> is creating its own values and starting the job. How can I set custom values?
>
> --
> Thanks and Regards
> Sakshi Bansal


Why consecutive calls of orderBy are forbidden?

2020-08-24 Thread 洪帆(既起)
Hi, all.
I tried calling two consecutive orderBy for a Table, but got an exception.
Can anyone explain why this happens? 
In my mind, orderBy should be able to be called by any Tables. But obviously, 
it is not with no explanation.

Here is a simplified version of code:

Table table = btenv.scan("source").orderBy("cola");
table.insertInto("sink");
Table table2 = table.orderBy("colb");
table2.insertInto("sink2");btenv.execute("testest");

The exception is as follows:

java.lang.NullPointerException
 at 
org.apache.flink.optimizer.dag.SingleInputNode.computeInterestingPropertiesForInputs(SingleInputNode.java:224)
 at 
org.apache.flink.optimizer.traversals.InterestingPropertyVisitor.preVisit(InterestingPropertyVisitor.java:51)
 at 
org.apache.flink.optimizer.traversals.InterestingPropertyVisitor.preVisit(InterestingPropertyVisitor.java:29)
 at 
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:513)
 at 
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
 at 
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
 at 
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
 at 
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
 at 
org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
 at org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
 at org.apache.flink.optimizer.dag.TwoInputNode.accept(TwoInputNode.java:751)
 at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:493)
 at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
 at 
org.apache.flink.test.util.TestEnvironment.compileProgram(TestEnvironment.java:132)
 at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:105)
 at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:225)
 at 
com.alibaba.alink.operator.batch.dataproc.SqlBatchOpsTest.testOrderBy(SqlBatchOpsTest.java:306)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
 at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
 at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
 at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
 at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)


Re: Debezium Flink EMR

2020-08-24 Thread Marta Paes Moreira
Yes — you'll get the full row in the payload; and you can also access the
change operation, which might be useful in your case.

About performance, I'm summoning Kurt and @Jark Wu  to the
thread, who will be able to give you a more complete answer and likely also
some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley  wrote:

> Yup! This definitely helps and makes sense.
>
> The 'after' payload comes with all data from the row right? So essentially
> inserts and updates I can insert/replace data by pk and null values I just
> delete by pk, and then I can build out the rest of my joins like normal.
>
> Are there any performance implications of doing it this way that is
> different from the out-of-the-box 1.11 solution?
>
> On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira 
> wrote:
>
>> Hi, Rex.
>>
>> Part of what enabled CDC support in Flink 1.11 was the refactoring of the
>> table source interfaces (FLIP-95 [1]), and the new ScanTableSource
>> [2], which allows to emit bounded/unbounded streams with insert, update and
>> delete rows.
>>
>> In theory, you could consume data generated with Debezium as regular
>> JSON-encoded events before Flink 1.11 — there just wasn't a convenient way
>> to really treat it as "changelog". As a workaround, what you can do in
>> Flink 1.10 is process these messages as JSON and extract the "after" field
>> from the payload, and then apply de-duplication [3] to keep only the last
>> row.
>>
>> The DDL for your source table would look something like:
>>
>> CREATE TABLE tablename ( *... * after ROW(`field1` DATATYPE, `field2`
>> DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );
>> Hope this helps!
>>
>> Marta
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication
>>
>>
>> On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler 
>> wrote:
>>
>>> @Jark Would it be possible to use the 1.11 debezium support in 1.10?
>>>
>>> On 20/08/2020 19:59, Rex Fenley wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to set up Flink with Debezium CDC Connector on AWS EMR,
>>> however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived
>>> in Flink 1.11.0, from looking at the documentation.
>>>
>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>>>
>>> I'm wondering what alternative solutions are available for connecting
>>> Debezium to Flink? Is there an open source Debezium connector that works
>>> with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0
>>> Debezium connector and compile it in my project using Flink 1.10.0 api?
>>>
>>> For context, I plan on doing some fairly complicated long lived stateful
>>> joins / materialization using the Table API over data ingested from
>>> Postgres and possibly MySQL.
>>>
>>> Appreciate any help, thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>>
>>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-24 Thread Aljoscha Krettek

Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there 
exceptions being thrown? How are you writing your file-based sources, 
what API methods are you using?


Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:

Hi Till,

Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
and WatermarkStrategy I am using are very simple ones.

*Code for AssignerWithPeriodicWatermarks:*

public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks {

 private final long maxOutOfOrderness = 0;
 private long currentMaxTimestamp;

 @Override
 public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
 long timestamp = myPojo.getInitiationTime().toEpochMilli();
 currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
 return timestamp;
 }

 @Override
 public Watermark getCurrentWatermark() {
 return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
 }
}


*Code for WatermarkStrategy :*

WatermarkStrategy watermarkStrategy =
 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0))
 .withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());


Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann  wrote:


Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has been
introduced quite recently and might have some rough edges. I am pulling in
Aljoscha and Klou who have worked on this feature and might be able to help
you. For better understanding your problem, it would be great if you could
share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.

For the file source, the Flink community has recently introduced a new
source abstraction which will also support checkpoints for file sources
once the file source connector has been migrated to the new interfaces. The
community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande  wrote:


Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1
the watermark generation has issues with file source alone. It works well
with Kafka source.

With 1.9.2 a custom watermark generator implementation of
AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
deprecated and to be replaced with WatermarkStrategy (that combines both
WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e.
old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
perfectly well but with file source none of them works. The watermark
assigner never increments the watermarks resulting in stateful operators
not clearing their state ever, leading to erroneous results and
continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so, any
fix planned shortly?

A side note (and probably a candidate for separate email, but I will
write it here) even checkpoints do not work with File Source since 1.9.2
and it is still the problem with 1.11.1. Just wondering if File source with
stream API is not a priority in Flink development? If so we can rethink our
sources.

Thanks & regards,
Arti









Setting job/task manager memory management in kubernetes

2020-08-24 Thread Sakshi Bansal
Hello,

I am trying to set the heap size of job and task manager when deploying the
job in kubernetes. I have set the jobmanager.heap.size and
taskmanager.heap.size.
However, the custom values are not being used and it is creating its own
values and starting the job. How can I set custom values?

-- 
Thanks and Regards
Sakshi Bansal


flink taskmanager ????????????container???? ??yarn kill ????????

2020-08-24 Thread ??????
Hi 
github??Blink(1.5)??flinkTaskmanager
 container??yarn kill.




 1. taskmanager
?? ??8G ?? 36G, state back 
??fileSystem RocksDB.

??taskmanageryarn kill.
 2. dump taskmanager ??
   ?? jmap -dump 
   ?? 
??dump??taskmanager??heartbeat ??kill. 
(??heartbeat??)
 3. debugdump??.
 4. containerized.heap-cutoff-ratio?? oom 
dump??.

Re: Ververica Flink training resources

2020-08-24 Thread David Anderson
Piper,

I'm happy to know that the exercises are working for you.


> The new exercises are running well but I could not adjust the
> servingspeedfactor to speed up the serving of data events. I'm guessing
> this feature was removed in the new repo.
>

That's right. The feature of adjusting the serving speed wasn't needed for
the exercises, and was sometimes a point of confusion during training. It
seemed best to remove this distraction.

Best,
David

On Sun, Aug 23, 2020 at 9:21 PM Piper Piper  wrote:

> Hi David
>
> 1. Thank you for fixing the links!
>
> 2. I downloaded the repo and data files in the middle of the rewriting, so
> the schema mentioned in the repo did not match the files. The new exercises
> are running well but I could not adjust the servingspeedfactor to speed up
> the serving of data events. I'm guessing this feature was removed in the
> new repo.
>
> Best,
> Piper
>
> On Sun, Aug 23, 2020 at 10:15 AM David Anderson 
> wrote:
>
>> Piper,
>>
>> 1. Thanks for reporting the problem with the broken links. I've just
>> fixed this.
>>
>> 2. The exercises were recently rewritten so that they no longer use the
>> old file-based datasets. Now they use data generators that are included in
>> the project. As part of this update, the schema was modified slightly (so
>> that the TaxiRide and TaxiFare types can be serialized with Flink's POJO
>> serializer). Is this causing a problem?
>>
>> Best,
>> David
>>
>> On Sun, Aug 23, 2020 at 12:20 AM Piper Piper 
>> wrote:
>>
>>> Hi Flink community,
>>>
>>> I have two questions regarding the Ververica Flink Training resources.
>>>
>>> 1. In the official Flink documentation, the hyperlinks to the github
>>> sites for the exercises in the "Learn Flink" section are not working. If
>>> possible, please provide me with the correct links for the exercises.
>>>
>>> 2. The schema of the Taxi Fares dataset matches with the old dataset
>>> (nycTaxiFares.gz). However, the schema of the Taxi Ride dataset given in
>>> the Ververica github site does not seem to match the dataset in the old
>>> file (nycTaxiRides.gz). Please advise.
>>>
>>> Given Schema: rideId, taxiId, driverId, isStart, startTime, endTime,
>>> startLon, startLat, endLon, endLat, passengerCnt
>>>
>>> nycTaxiRides.gz sample line (after extracting to file
>>> nycTaxiRides4): 6,START,2013-01-01 00:00:00,1970-01-01
>>> 00:00:00,-73.866135,40.771091,-73.961334,40.764912,6,201306,201306
>>>
>>> Thank you!
>>>
>>> Piper
>>>
>>


flink 1.10 ???????? Managed memory ??????

2020-08-24 Thread ????????
flink 1.10 metric??Heap??NonHeap??Direct ?? Mapped??Managed 
memory?? ??Managed 
memory ps??
  ?? Managed memory 
   Managed memory??




?? flink ?? sum =Heap+NonHeap+Direct+Mapped 
??    
?? sum ?? Managed memory??


?? taskmanager.memory.process.size=8192mb

Re: AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-24 Thread Manas Kale
Thanks Prasanna and Chesnay. Changing the dependency scope worked and I
also had to add a maven shaded plugin transformer to resolve another error.

On Fri, Aug 21, 2020 at 11:38 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Manas,
>
> One option you could try is to set the scope in the dependencies as
> compile for the required artifacts rather than provided.
>
> Prasanna.
>
> On Fri, Aug 21, 2020 at 1:47 PM Chesnay Schepler 
> wrote:
>
>> If this class cannot be found on the classpath then chances are Flink is
>> completely missing from the classpath.
>>
>> I haven't worked with EMR, but my guess is that you did not submit things
>> correctly.
>>
>> From the EMR documentation I could gather that the submission should work
>> without the submitted jar bundling all of Flink;
>>
>> given that you jar works in a local cluster that part should not be the
>> problem.
>>
>> On 21/08/2020 08:16, Manas Kale wrote:
>>
>> Hi,
>> I am trying to deploy a Flink jar on AWS EMR service. I have ensured that
>> Flink v1.10.0 is used in my pom file as that's the version supported by
>> EMR. However, I get the following error:
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError: 
>> org/apache/flink/api/java/typeutils/ResultTypeQueryable
>>  at java.lang.ClassLoader.defineClass1(Native Method)
>>  at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>>  at 
>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>  at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>  at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Class.java:348)
>>  at org.apache.hadoop.util.RunJar.run(RunJar.java:232)
>>  at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
>> Caused by: java.lang.ClassNotFoundException: 
>> org.apache.flink.api.java.typeutils.ResultTypeQueryable
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>  ... 15 more
>>
>> Also, if I deploy this on my local Flink cluster (v1.10.1) it works.
>> I'm not sure what could be the cause. Could it be because of
>> misconfigured classes bundled in the final JAR file or something that was
>> patched in v 1.10.1?
>>
>>
>>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Thanks a lot for the discussion!

I will open a voting thread shortly!

Kostas

On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas  wrote:
>
> Hi Guowei,
>
> Thanks for the insightful comment!
>
> I agree that this can be a limitation of the current runtime, but I
> think that this FLIP can go on as it discusses mainly the semantics
> that the DataStream API will expose when applied on bounded data.
> There will definitely be other FLIPs that will actually handle the
> runtime-related topics.
>
> But it is good to document them nevertheless so that we start soon
> ironing out the remaining rough edges.
>
> Cheers,
> Kostas
>
> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:
> >
> > Hi, Klou
> >
> > Thanks for your proposal. It's a very good idea.
> > Just a little comment about the "Batch vs Streaming Scheduling".  In the 
> > AUTOMATIC execution mode maybe we could not pick BATCH execution mode even 
> > if all sources are bounded. For example some applications would use the 
> > `CheckpointListener`, which is not available in the BATCH mode in current 
> > implementation.
> > So maybe we need more checks in the AUTOMATIC execution mode.
> >
> > Best,
> > Guowei
> >
> >
> > On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:
> >>
> >> Hi all,
> >>
> >> Thanks for the comments!
> >>
> >> @Dawid: "execution.mode" can be a nice alternative and from a quick
> >> look it is not used currently by any configuration option. I will
> >> update the FLIP accordingly.
> >>
> >> @David: Given that having the option to allow timers to fire at the
> >> end of the job is already in the FLIP, I will leave it as is and I
> >> will update the default policy to be "ignore processing time timers
> >> set by the user". This will allow existing dataStream programs to run
> >> on bounded inputs. This update will affect point 2 in the "Processing
> >> Time Support in Batch" section.
> >>
> >> If these changes cover your proposals, then I would like to start a
> >> voting thread tomorrow evening if this is ok with you.
> >>
> >> Please let me know until then.
> >>
> >> Kostas
> >>
> >> On Tue, Aug 18, 2020 at 3:54 PM David Anderson  
> >> wrote:
> >> >
> >> > Being able to optionally fire registered processing time timers at the 
> >> > end of a job would be interesting, and would help in (at least some of) 
> >> > the cases I have in mind. I don't have a better idea.
> >> >
> >> > David
> >> >
> >> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  
> >> > wrote:
> >> >>
> >> >> Hi Kurt and David,
> >> >>
> >> >> Thanks a lot for the insightful feedback!
> >> >>
> >> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> >> agree with you that it requires a lot more work and careful thinking
> >> >> on the semantics. This FLIP was written under the assumption that if
> >> >> the user wants to have checkpoints on bounded input, he/she will have
> >> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> >> can be handled as a separate topic in the future.
> >> >>
> >> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> >> what are the plans there at the scheduling level, as one could imagine
> >> >> in the future that in mixed workloads, we schedule first all the
> >> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> >> subgraph per application, which is going to be scheduled after all
> >> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> >> towards that direction.
> >> >>
> >> >>
> >> >> @David: The processing time timer handling is a topic that has also
> >> >> been discussed in the community in the past, and I do not remember any
> >> >> final conclusion unfortunately.
> >> >>
> >> >> In the current context and for bounded input, we chose to favor
> >> >> reproducibility of the result, as this is expected in batch processing
> >> >> where the whole input is available in advance. This is why this
> >> >> proposal suggests to not allow processing time timers. But I
> >> >> understand your argument that the user may want to be able to run the
> >> >> same pipeline on batch and streaming this is why we added the two
> >> >> options under future work, namely (from the FLIP):
> >> >>
> >> >> ```
> >> >> Future Work: In the future we may consider adding as options the 
> >> >> capability of:
> >> >> * firing all the registered processing time timers at the end of a job
> >> >> (at close()) or,
> >> >> * ignoring all the registered processing time timers at the end of a 
> >> >> job.
> >> >> ```
> >> >>
> >> >> Conceptually, we are essentially saying that we assume that batch
> >> >> execution is assumed to be instantaneous and refers to a single
> >> >> "point" in time and any 

请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 Thread dixingxin...@163.com
Hi all:
Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
1.底层是使用了debezium来加载历史数据的吗?
2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?

希望能帮忙解答一下,谢谢。


Best,
Xingxing Di


flink on yarn默认GC的问题

2020-08-24 Thread shizk233
Hi all,

请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。
在*默认设置*下,能观察到本地flink使用的G1,但on yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗?

我搜索了一些相关资料,但仍然没有搞清楚这是怎么回事,希望有了解的朋友帮忙解答下。感谢!

备注:我可以通过在flink-conf.yaml中设置env.java.opts: -XX:+UseG1GC来使flink on yarn也使用G1。


flink1.10以后,task堆外内存什么时候使用?

2020-08-24 Thread caozhen
如题,想问下大家task堆外内存设置规则(taskmanager.memory.task.off-heap.size)

1、是用户代码中指定了使用堆外内存吗?
2、还是flink框架中在某种情况下使用堆外内存?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教 hive streaming 报错

2020-08-24 Thread Rui Li
hive相关的依赖是怎么添加的啊?这两个类的package名字是一样的,按说可以访问。不确定是不是因为通过不同的classloader加载导致的。

On Mon, Aug 24, 2020 at 2:17 PM McClone  wrote:

> 版本为:Flink 1.11.0
>
>
> 2020-08-24 13:33:03,019 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Unhandled
> exception.
>
> java.lang.IllegalAccessError: tried to access class
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl
> from class
> org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder.(HadoopPathBasedBulkFormatBuilder.java:70)
> ~[?:?]
>
> at
> org.apache.flink.connectors.hive.HiveTableSink.consumeDataStream(HiveTableSink.java:197)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]



-- 
Best regards!
Rui Li


Re: Performance issue associated with managed RocksDB memory

2020-08-24 Thread Juha Mynttinen
The issue can be reproduced by using a certain combinations of the value of
RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job
parallelism.

Examples that break:
* Parallelism 1 and WRITE_BUFFER_RATIO 0.1
* Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5

Examples that work:
* Parallelism 1 and WRITE_BUFFER_RATIO 0.5, duration 34164 ms

In a working case (parallelism 1, WRITE_BUFFER_RATIO 0.2) RocksDB log looks
like this (right after the uninteresting bootup messages):

2020/08/21-12:55:41.693771 7f56e6643700 [db/db_impl.cc:1546] Created column
family [valueState] (ID 1)
2020/08/21-12:55:42.213743 7f56e6643700 [db/db_impl_write.cc:1103] Flushing
column family with largest mem table size. Write buffer is using 16789472
bytes out of a total of 17895697.
2020/08/21-12:55:42.213799 7f56e6643700 [db/db_impl_write.cc:1423]
[valueState] New memtable created with log file: #3. Immutable memtables: 0.
2020/08/21-12:55:42.213924 7f56deffd700 (Original Log Time
2020/08/21-12:55:42.213882) [db/db_impl_compaction_flush.cc:1560] Calling
FlushMemTableToOutputFile with column family [valueState], flush slots
available 1, compaction slots available 1, flush slots scheduled 1,
compaction slots scheduled 0
2020/08/21-12:55:42.213927 7f56deffd700 [db/flush_job.cc:304] [valueState]
[JOB 2] Flushing memtable with next log file: 3
2020/08/21-12:55:42.213969 7f56deffd700 EVENT_LOG_v1 {"time_micros":
1598003742213958, "job": 2, "event": "flush_started", "num_memtables": 1,
"num_entries": 170995, "num_deletes": 0, "memory_usage": 8399008,
"flush_reason": "Write Buffer Full"}
2020/08/21-12:55:42.213973 7f56deffd700 [db/flush_job.cc:334] [valueState]
[JOB 2] Level-0 flush table #9: started
2020/08/21-12:55:42.228444 7f56deffd700 EVENT_LOG_v1 {"time_micros":
1598003742228435, "cf_name": "valueState", "job": 2, "event":
"table_file_creation", "file_number": 9, "file_size": 10971,
"table_properties": {"data_size": 10200, "index_size": 168, "filter_size":
0, "raw_key_size": 18000, "raw_average_key_size": 18, "raw_value_size":
8000, "raw_average_value_size": 8, "num_data_blocks": 6, "num_entries":
1000, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}}
2020/08/21-12:55:42.228460 7f56deffd700 [db/flush_job.cc:374] [valueState]
[JOB 2] Level-0 flush table #9: 10971 bytes OK

The main thing to look at is "num_entries": 170995, meaning RocksDB flushes
a memtable with quite large number of entries. It flushes 53 times during
the test, which sounds sensible.

In a breaking case (parallelism 1, WRITE_BUFFER_RATIO 0.1) RocksDB log looks
like this:

2020/08/21-12:53:02.917606 7f2cabfff700 [db/db_impl_write.cc:1103] Flushing
column family with largest mem table size. Write buffer is using 8396784
bytes out of a total of 8947848.
2020/08/21-12:53:02.917702 7f2cabfff700 [db/db_impl_write.cc:1423]
[valueState] New memtable created with log file: #3. Immutable memtables: 0.
2020/08/21-12:53:02.917988 7f2ca8bf1700 (Original Log Time
2020/08/21-12:53:02.917868) [db/db_impl_compaction_flush.cc:1560] Calling
FlushMemTableToOutputFile with column family [valueState], flush slots
available 1, compaction slots available 1, flush slots scheduled 1,
compaction slots scheduled 0
2020/08/21-12:53:02.918004 7f2ca8bf1700 [db/flush_job.cc:304] [valueState]
[JOB 2] Flushing memtable with next log file: 3
2020/08/21-12:53:02.918099 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros":
1598003582918053, "job": 2, "event": "flush_started", "num_memtables": 1,
"num_entries": 29, "num_deletes": 0, "memory_usage": 6320, "flush_reason":
"Write Buffer Full"}
2020/08/21-12:53:02.918118 7f2ca8bf1700 [db/flush_job.cc:334] [valueState]
[JOB 2] Level-0 flush table #9: started
...
2020/08/21-12:55:20.261887 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros":
1598003720261879, "job": 20079, "event": "flush_started", "num_memtables":
1, "num_entries": 29, "num_deletes": 0, "memory_usage": 2240,
"flush_reason": "Write Buffer Full"}
2020/08/21-12:55:20.261892 7f2ca8bf1700 [db/flush_job.cc:334] [valueState]
[JOB 20079] Level-0 flush table #20085: started

This time "num_entries": 29, meaning RocksDB flushes the memtable when there
are only 29 entries consuming 6320 bytes memory. All memtable flushes look
alike. There are total flushes 20079 times during the test, which is more
than 300 times more than with the working config. Memtable flush and the
compactions those will cause kill the performance.

It looks like RocksDB flushes way too early, before the memtable should be
considered full. But why? The answer lies in the RocksDB code.

kingspace/frocksdb/db/db_impl_write.cc
  if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If
another
// thread is writing to another DB with the same write buffer, they may
also
// be flushed. We may end up with flushing much more DBs than needed.
It's
// suboptimal but still 

Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据

2020-08-24 Thread taochanglian

hive3.0默认就是事务表,建表语句加上 TBLPROPERTIES('transactional'='false')


在 2020/8/24 15:43, 黄蓉 写道:

感谢各位:

   
我已经找到问题的原因了,是因为HDP3.0.1中的Hive3.1.0默认开启了事务,而Flink 
1.11.0写入和读取hive表应该是暂时不支持事务的。所以两者不兼容。我把Hive中事务相关的设置都关闭之后就正常了。 



Jessie
jessie...@gmail.com

-- Original Message --
From: "taochanglian" 
To: user-zh@flink.apache.org
Sent: 8/24/2020 5:28:56 AM
Subject: Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 
中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么?


在 2020/8/24 3:01, 黄蓉 写道:

各位好:

我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink 
sql 
client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show 
tables,show database等语句都可以正常显示。


配置的hadoop环境变量如下:
export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop"
export 
HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*"


sql-client配置文件如下:
tables: []
functions: []
catalogs:
   - name: myhive
 type: hive
 hive-conf-dir: /opt/hive-conf
execution:
  planner: blink
  type: batch
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  current-catalog: myhive
  current-database: default
  restart-strategy:
    type: fallback
deployment:
  response-timeout: 5000
  gateway-address: ""
  gateway-port: 0


请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗? 



Jessie
jessie...@gmail.com




Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Hi Guowei,

Thanks for the insightful comment!

I agree that this can be a limitation of the current runtime, but I
think that this FLIP can go on as it discusses mainly the semantics
that the DataStream API will expose when applied on bounded data.
There will definitely be other FLIPs that will actually handle the
runtime-related topics.

But it is good to document them nevertheless so that we start soon
ironing out the remaining rough edges.

Cheers,
Kostas

On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:
>
> Hi, Klou
>
> Thanks for your proposal. It's a very good idea.
> Just a little comment about the "Batch vs Streaming Scheduling".  In the 
> AUTOMATIC execution mode maybe we could not pick BATCH execution mode even if 
> all sources are bounded. For example some applications would use the 
> `CheckpointListener`, which is not available in the BATCH mode in current 
> implementation.
> So maybe we need more checks in the AUTOMATIC execution mode.
>
> Best,
> Guowei
>
>
> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> Thanks for the comments!
>>
>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>> look it is not used currently by any configuration option. I will
>> update the FLIP accordingly.
>>
>> @David: Given that having the option to allow timers to fire at the
>> end of the job is already in the FLIP, I will leave it as is and I
>> will update the default policy to be "ignore processing time timers
>> set by the user". This will allow existing dataStream programs to run
>> on bounded inputs. This update will affect point 2 in the "Processing
>> Time Support in Batch" section.
>>
>> If these changes cover your proposals, then I would like to start a
>> voting thread tomorrow evening if this is ok with you.
>>
>> Please let me know until then.
>>
>> Kostas
>>
>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson  wrote:
>> >
>> > Being able to optionally fire registered processing time timers at the end 
>> > of a job would be interesting, and would help in (at least some of) the 
>> > cases I have in mind. I don't have a better idea.
>> >
>> > David
>> >
>> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  wrote:
>> >>
>> >> Hi Kurt and David,
>> >>
>> >> Thanks a lot for the insightful feedback!
>> >>
>> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> >> agree with you that it requires a lot more work and careful thinking
>> >> on the semantics. This FLIP was written under the assumption that if
>> >> the user wants to have checkpoints on bounded input, he/she will have
>> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> >> can be handled as a separate topic in the future.
>> >>
>> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> >> should be set to STREAMING. That is why the AUTOMATIC option sets
>> >> scheduling to BATCH only if all the sources are bounded. I am not sure
>> >> what are the plans there at the scheduling level, as one could imagine
>> >> in the future that in mixed workloads, we schedule first all the
>> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> >> subgraph per application, which is going to be scheduled after all
>> >> Bounded ones have finished. Essentially the bounded subgraphs will be
>> >> used to bootstrap the unbounded one. But, I am not aware of any plans
>> >> towards that direction.
>> >>
>> >>
>> >> @David: The processing time timer handling is a topic that has also
>> >> been discussed in the community in the past, and I do not remember any
>> >> final conclusion unfortunately.
>> >>
>> >> In the current context and for bounded input, we chose to favor
>> >> reproducibility of the result, as this is expected in batch processing
>> >> where the whole input is available in advance. This is why this
>> >> proposal suggests to not allow processing time timers. But I
>> >> understand your argument that the user may want to be able to run the
>> >> same pipeline on batch and streaming this is why we added the two
>> >> options under future work, namely (from the FLIP):
>> >>
>> >> ```
>> >> Future Work: In the future we may consider adding as options the 
>> >> capability of:
>> >> * firing all the registered processing time timers at the end of a job
>> >> (at close()) or,
>> >> * ignoring all the registered processing time timers at the end of a job.
>> >> ```
>> >>
>> >> Conceptually, we are essentially saying that we assume that batch
>> >> execution is assumed to be instantaneous and refers to a single
>> >> "point" in time and any processing-time timers for the future may fire
>> >> at the end of execution or be ignored (but not throw an exception). I
>> >> could also see ignoring the timers in batch as the default, if this
>> >> makes more sense.
>> >>
>> >> By the way, do you have any usecases in mind that will help us better
>> >> shape our processing time timer handling?
>> >>
>> >> Kostas
>> >>
>> >> On 

Re[2]: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据

2020-08-24 Thread 黄蓉

感谢各位:

   我已经找到问题的原因了,是因为HDP3.0.1中的Hive3.1.0默认开启了事务,而Flink 
1.11.0写入和读取hive表应该是暂时不支持事务的。所以两者不兼容。我把Hive中事务相关的设置都关闭之后就正常了。


Jessie
jessie...@gmail.com

-- Original Message --
From: "taochanglian" 
To: user-zh@flink.apache.org
Sent: 8/24/2020 5:28:56 AM
Subject: Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 中的 
flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么?

在 2020/8/24 3:01, 黄蓉 写道:

各位好:

我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink
 sql client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show tables,show 
database等语句都可以正常显示。

配置的hadoop环境变量如下:
export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop"
export 
HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*"

sql-client配置文件如下:
tables: []
functions: []
catalogs:
   - name: myhive
 type: hive
 hive-conf-dir: /opt/hive-conf
execution:
  planner: blink
  type: batch
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  current-catalog: myhive
  current-database: default
  restart-strategy:
type: fallback
deployment:
  response-timeout: 5000
  gateway-address: ""
  gateway-port: 0


请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗?

Jessie
jessie...@gmail.com




报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-24 Thread song wang
各位老哥, flink
运行在yarn上,偶尔报错无法解析ResourceManager地址,可是从对应的host上查找是有flink进程的,请问是什么原因呢?
flink 版本1.9.0

部分日志如下:
```
2020-08-24 15:11:31,566 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1 ms:
Ask timed out on [ActorSelection[Anchor(akka://flink/),
Path(/user/resourcemanager)]] after [1 ms]. Message of type
[akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
the recipient actor didn't send a reply..
2020-08-24 15:11:51,606 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1 ms:
Ask timed out on [ActorSelection[Anchor(akka://flink/),
Path(/user/resourcemanager)]] after [1 ms]. Message of type
[akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
the recipient actor didn't send a reply..
2020-08-24 15:12:11,645 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1 ms:
Ask timed out on [ActorSelection[Anchor(akka://flink/),
Path(/user/resourcemanager)]] after [1 ms]. Message of type
[akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
the recipient actor didn't send a reply..
2020-08-24 15:12:31,687 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1 ms:
Ask timed out on [ActorSelection[Anchor(akka://flink/),
Path(/user/resourcemanager)]] after [1 ms]. Message of type
[akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
the recipient actor didn't send a reply..
2020-08-24 15:12:51,727 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Could not resolve ResourceManager address
akka.tcp://flink@hostname:16098/user/resourcemanager, retrying in 1 ms:
Ask timed out on [ActorSelection[Anchor(akka://flink/),
Path(/user/resourcemanager)]] after [1 ms]. Message of type
[akka.actor.Identify]. A typical reason for `AskTimeoutException` is that
the recipient actor didn't send a reply..
2020-08-24 15:13:08,198 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
Streaming WordCount (ff0ab7ec3e577a8e0c69e1c8454e5b72) switched from state
RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 9, slots allocated: 0, previous allocation IDs: [], execution
status: completed exceptionally: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
java.util.concurrent.TimeoutException/java.util.concurrent.CompletableFuture@28d7ad5d[Completed
exceptionally], incomplete: java.util.concurrent.CompletableFuture@663cdf7e[Not
completed, 1 dependents], incomplete:
java.util.concurrent.CompletableFuture@2058a7e9[Not completed, 1
dependents], incomplete: java.util.concurrent.CompletableFuture@5c1121c8[Not
completed, 1 dependents], incomplete:
java.util.concurrent.CompletableFuture@49b9c252[Not completed, 1
dependents], incomplete: java.util.concurrent.CompletableFuture@497e3334[Not
completed, 1 dependents], incomplete:
java.util.concurrent.CompletableFuture@2c7ca21d[Not completed, 1
dependents], incomplete: java.util.concurrent.CompletableFuture@7936c93b[Not
completed, 1 dependents], incomplete:
java.util.concurrent.CompletableFuture@7e9a2f1d[Not completed, 1 dependents]
   at
org.apache.flink.runtime.executiongraph.SchedulingUtils.lambda$scheduleEager$1(SchedulingUtils.java:194)
   at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
   at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
   at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
   at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
   at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
   at
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:656)
   at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
   at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
   at
org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
   at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   

关于flink 读取 jdbc报错详情,序列化报错

2020-08-24 Thread 引领


使用场景:FLink 1.11.1 读取mysql,一直报序列化错误,但感觉需要序列化的bean对象已经序列化,实在百思不得其解,前来求教各位大佬!!! 
代码再附件!!!


Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
java.util.concurrent.CountDownLatch@45ca843[Count = 2] is not serializable. The 
object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1571)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1553)
at com.hsq.APP.main(APP.java:43)
Caused by: java.io.NotSerializableException: java.util.concurrent.CountDownLatch
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 9 more


| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制

public class JdbcReaderTest extends RichSourceFunction> {


private DataSource dataSource;
private String querySql;
private Connection conn;
private PreparedStatement pst;

public JdbcReaderTest(DataSource dataSource, String querySql) {
this.dataSource = dataSource;
this.querySql = querySql;
}


/**
 * 打开相应的数据连接
 *
 * @param parameters
 * @throws Exception
 */
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = dataSource.getConnection();
pst = conn.prepareStatement(querySql);
}

/**
 * 执行查询并获取结果
 *
 * @param ctx
 * @throws Exception
 */
@Override
public void run(SourceContext> ctx) throws Exception {

ResultSet rs = pst.executeQuery();
while (rs.next()) {
long id = rs.getLong(1);
int userId = rs.getInt(2);
Tuple2 tuple2 = new Tuple2<>(id, userId);
ctx.collect(tuple2);
}
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}

/**
 * 关闭相应的连接
 */
@Override
public void cancel() {
DbUtil.close(conn,pst);
}
}


Test class APP{

public static void main(String[] args) throws Exception {

String url = null;

if (args.length > 0){
url = args[0];
} else {
url = URL;
}

Setting setting = new Setting(url);
DSFactory dsFactory = DSFactory.create(setting);
DataSource dataSource = dsFactory.getDataSource(MYSQL_HSQ_GROUP);

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);

String sql = "select id,user_id from order limit 10";
JdbcReaderTest jdbcReader = new JdbcReaderTest(dataSource,sql);
DataStreamSource> 
tuple2DataStreamSource = env.addSource(jdbcReader);
tuple2DataStreamSource.print();

env.execute();

}
}

Re: flink1.11 cdc使用

2020-08-24 Thread Dream-底限
好的,感谢

china_tao  于2020年8月24日周一 下午12:21写道:

> 支持。
> insert into mysqlresult select k.vin,k.msgtime,d.brand_name from (SELECT
> vin,max(msgtime) as msgtime,max(pts) as pts from kafkaSourceTable  group by
> TUMBLE(rowtime, INTERVAL '10' SECOND),vin) AS k left join msyqlDimTable
> FOR
> SYSTEM_TIME AS OF k.pts AS d ON k.vin = d.vin
>
> 类似这样,先开10秒窗口获得kafka数据,然后join msyql维度表,然后插入mysql。
>
> 关键就是注意维度表lookup_cache_max-rows,lookup_cache_ttl这两个参数,设置维度表的更新时间。具体项目,具体对待,关键就是看看需要维度表支持多长时间的更新延迟。
> 另外,join维度表,目前应该只支持pts,不支持rowtime。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Guowei Ma
Hi, Klou

Thanks for your proposal. It's a very good idea.
Just a little comment about the "Batch vs Streaming Scheduling".  In the
AUTOMATIC execution mode maybe we could not pick BATCH execution mode even
if all sources are bounded. For example some applications would use the
`CheckpointListener`, which is not available in the BATCH mode in current
implementation.
So maybe we need more checks in the AUTOMATIC execution mode.

Best,
Guowei


On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:

> Hi all,
>
> Thanks for the comments!
>
> @Dawid: "execution.mode" can be a nice alternative and from a quick
> look it is not used currently by any configuration option. I will
> update the FLIP accordingly.
>
> @David: Given that having the option to allow timers to fire at the
> end of the job is already in the FLIP, I will leave it as is and I
> will update the default policy to be "ignore processing time timers
> set by the user". This will allow existing dataStream programs to run
> on bounded inputs. This update will affect point 2 in the "Processing
> Time Support in Batch" section.
>
> If these changes cover your proposals, then I would like to start a
> voting thread tomorrow evening if this is ok with you.
>
> Please let me know until then.
>
> Kostas
>
> On Tue, Aug 18, 2020 at 3:54 PM David Anderson 
> wrote:
> >
> > Being able to optionally fire registered processing time timers at the
> end of a job would be interesting, and would help in (at least some of) the
> cases I have in mind. I don't have a better idea.
> >
> > David
> >
> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas 
> wrote:
> >>
> >> Hi Kurt and David,
> >>
> >> Thanks a lot for the insightful feedback!
> >>
> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> agree with you that it requires a lot more work and careful thinking
> >> on the semantics. This FLIP was written under the assumption that if
> >> the user wants to have checkpoints on bounded input, he/she will have
> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> can be handled as a separate topic in the future.
> >>
> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> what are the plans there at the scheduling level, as one could imagine
> >> in the future that in mixed workloads, we schedule first all the
> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> subgraph per application, which is going to be scheduled after all
> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> towards that direction.
> >>
> >>
> >> @David: The processing time timer handling is a topic that has also
> >> been discussed in the community in the past, and I do not remember any
> >> final conclusion unfortunately.
> >>
> >> In the current context and for bounded input, we chose to favor
> >> reproducibility of the result, as this is expected in batch processing
> >> where the whole input is available in advance. This is why this
> >> proposal suggests to not allow processing time timers. But I
> >> understand your argument that the user may want to be able to run the
> >> same pipeline on batch and streaming this is why we added the two
> >> options under future work, namely (from the FLIP):
> >>
> >> ```
> >> Future Work: In the future we may consider adding as options the
> capability of:
> >> * firing all the registered processing time timers at the end of a job
> >> (at close()) or,
> >> * ignoring all the registered processing time timers at the end of a
> job.
> >> ```
> >>
> >> Conceptually, we are essentially saying that we assume that batch
> >> execution is assumed to be instantaneous and refers to a single
> >> "point" in time and any processing-time timers for the future may fire
> >> at the end of execution or be ignored (but not throw an exception). I
> >> could also see ignoring the timers in batch as the default, if this
> >> makes more sense.
> >>
> >> By the way, do you have any usecases in mind that will help us better
> >> shape our processing time timer handling?
> >>
> >> Kostas
> >>
> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson 
> wrote:
> >> >
> >> > Kostas,
> >> >
> >> > I'm pleased to see some concrete details in this FLIP.
> >> >
> >> > I wonder if the current proposal goes far enough in the direction of
> recognizing the need some users may have for "batch" and "bounded
> streaming" to be treated differently. If I've understood it correctly, the
> section on scheduling allows me to choose STREAMING scheduling even if I
> have bounded sources. I like that approach, because it recognizes that even
> though I have bounded inputs, I don't necessarily want batch processing
> semantics. I think it makes sense to 

Re: ERROR : RocksDBStateBackend

2020-08-24 Thread Till Rohrmann
Great to hear that you fixed the problem!

Cheers,
Till

On Mon, Aug 24, 2020 at 2:53 AM Vijayendra Yadav 
wrote:

> Thank You Till.  I had an old hadoop version dependency in one of the
> dependent jars causing conflict.
>
> On Fri, Aug 21, 2020 at 12:24 AM Till Rohrmann 
> wrote:
>
>> Hi Vijay,
>>
>> could you move the s3 filesystem
>> dependency lib/flink-s3-fs-hadoop-1.10.0.jar into the plugin directory? See
>> this link [1] for more information. Since Flink 1.10 we have removed the
>> relocation of filesystem dependencies because the recommended way to load
>> them is via Flink's plugin mechanism. I suspect that this dependency is
>> causing the conflicts.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>>
>> Cheers,
>> Till
>>
>> On Thu, Aug 20, 2020 at 7:06 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Till/ Piotr,
>>>
>>> *My process was working with : FsStateBackend  but when I switched
>>> to RocksDBStateBackend I faced this problem. My class path is below. *
>>>
>>> *Related jar in classpath: *
>>> /usr/lib/hadoop-yarn/hadoop-yarn-api-2.8.5-amzn-6.jar:/usr/lib/hadoop-yarn/hadoop-yarn-api.jar:
>>>
>>>
>>> *Classpath:*
>>> 

Re:Re: Re: 如何设置FlinkSQL并行度

2020-08-24 Thread forideal
Hi 本超,


感谢你的回复,这个地方的代码我们确实改动过,官方代码的行为是正常的。非常感谢!
  > 目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。
> 1.并行度超过 topic partition 的时候会造成资源浪费
> 2.并行度超过 topic partition 后,checkpoint 也无法正常触发了
其中第二个问题是我们自己改动官方 Flink 源码造成的。
 
Best forideal

在 2020-08-22 11:37:20,"Benchao Li"  写道:
>Hi forideal,
>
>我在本地试了一下,没有复现你说的这个情况。
>我看代码也没有这个逻辑,如果是没有分配到partition,应该是会阻塞住,而不是finish。
>你这个测试用的是社区的版本么?还是有什么特殊的改动?
>
>forideal  于2020年8月21日周五 下午11:43写道:
>
>> Hi 赵一旦,
>> 基础信息:使用 watermark for 语法设置watermark,Flink SQL,Blink planner,Flink 1.10.0
>> 我最近做了一个实验,将Flink SQL 的并发设置为 kafka topic partition 的 2 倍,同时设置 idle 的时间为 10s。
>> 这时,1.source 会有一半的partition 立马就 finished
>> 2.下游的 workmark 变成了LONG的最大值
>> 整个任务都无法正常运行了。
>>
>>
>> Best forideal
>>
>>
>>
>>
>> 在 2020-08-17 10:05:48,"Zhao,Yi(SEC)"  写道:
>> >我这边才研究FlinkSQL没几天。不过按照目前了解,是不支持算子级别并行度设置的。
>>
>> >此外你说的checkpoint无法正常触发,我估计是因为barrier的问题,部分并行示例没有分区数据,导致没数据就可能导致。这个问题类似,可能无解。
>> >
>> >非要解决可以写代码,把souce部分不使用sql实现。
>> >__
>> >
>> >在 2020/8/15 下午8:21,“forideal” 写入:
>> >
>> >Hi 赵一旦,
>> >
>> >
>> >目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。
>> >1.并行度超过 topic partition 的时候会造成资源浪费
>> >2.并行度超过 topic partition 后,checkpoint 也无法正常触发了
>> >
>> >
>> >Best forideal
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2020-08-14 12:03:32,"赵一旦"  写道:
>> >>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?
>> >>
>> >>Xingbo Huang  于2020年8月14日周五 下午12:01写道:
>> >>
>> >>> Hi,
>> >>>
>> >>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
>> >>>
>> >>> Best,
>> >>> Xingbo
>> >>>
>> >>> Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:
>> >>>
>> >>> >
>> 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
>> >>> >
>> >>> > 发件人: "Zhao,Yi(SEC)" 
>> >>> > 日期: 2020年8月13日 星期四 上午11:44
>> >>> > 收件人: "user-zh@flink.apache.org" 
>> >>> > 主题: 如何设置FlinkSQL并行度
>> >>> >
>> >>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
>> >>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
>> >>> >
>> >>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
>> >>> >
>> >>> >
>> >>>
>> >
>> >
>>
>
>
>-- 
>
>Best,
>Benchao Li


请教 hive streaming 报错

2020-08-24 Thread McClone
版本为:Flink 1.11.0 


2020-08-24 13:33:03,019 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Unhandled 
exception.

java.lang.IllegalAccessError: tried to access class 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl
 from class 
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder

at 
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder.(HadoopPathBasedBulkFormatBuilder.java:70)
 ~[?:?]

at 
org.apache.flink.connectors.hive.HiveTableSink.consumeDataStream(HiveTableSink.java:197)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:114)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]

at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]

at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]