Container is running beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory used; 7.0 GB of 25 GB virtual memory used. Killing container.

2021-03-30 Thread admin
java.lang.Exception: Container 
[pid=17248,containerID=container_1597847003686_12235_01_001336] is running 
beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory 
used; 7.0 GB of 25 GB virtual memory used. Killing container.
Dump of the process-tree for container_1597847003686_12235_01_001336 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 17283 17248 17248 17248 (java) 1025867 190314 7372083200 1311496 
/usr/local/jdk1.8/bin/java -Xmx2147483611 -Xms2147483611 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -server 
-XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=75 -XX:ParallelGCThreads=4 
-XX:+AlwaysPreTouch -XX:NewRatio=1 -DjobName=fastmidu-deeplink-tuid-20200203 
-Dlog.file=/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1825361124b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=2013265883b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=di-h4-dn-134.h.ab1.qttsite.net -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-f63d543b-a75a-4dc4-be93-979eebd8062d 
-Djobmanager.rpc.port=43423 -Drest.address=di-h4-dn-134.h.ab1.qttsite.net 
|- 17248 17246 17248 17248 (bash) 0 0 116015104 353 /bin/bash -c 
/usr/local/jdk1.8/bin/java -Xmx2147483611 -Xms2147483611 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -server 
-XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=75 -XX:ParallelGCThreads=4 
-XX:+AlwaysPreTouch -XX:NewRatio=1 -DjobName=fastmidu-deeplink-tuid-20200203 
-Dlog.file=/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1825361124b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=2013265883b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='di-h4-dn-134.h.ab1.qttsite.net' -Dweb.port='0' 
-Dweb.tmpdir='/tmp/flink-web-f63d543b-a75a-4dc4-be93-979eebd8062d' 
-Djobmanager.rpc.port='43423' -Drest.address='di-h4-dn-134.h.ab1.qttsite.net' 
1> 
/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.out
 2> 
/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.err
 

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Hi :

退订

2021-03-30 Thread Y Luo
退订


JDBC connector support for JSON

2021-03-30 Thread Fanbin Bu
Hi,

For a streaming job that uses Kafka connector, this doc
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options
shows that we can parse json data format. However, it does not seem
like Flink JDBC connector support json data type, at least from this doc
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#data-type-mapping
.

So the question is, does JDBC connector also have this capability? if not,
what's required to enable it. At the end of the day, I would like to see
something like this:

create table aTable(field1 type, jsonField1 ROW >)
with
(
'connector' = 'jdbc',
'url' = '...',
'table-name' = 'my-table-with-json-column',
...
)

tEnv.executeSql("select jsonField1.jsonField2.field3 from aTable")

Thanks,
Fanbin


Re: flink-提交jar 隔断时间自己重启问题

2021-03-30 Thread yidan zhao
没看懂问题。任务自动重启?失败了自然就重启了,restart策略设置的吧。

valve <903689...@qq.com> 于2021年3月31日周三 上午11:31写道:

> 我也遇到这个问题 不知道为啥
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re:回复:flink 从mysql读取数据异常

2021-03-30 Thread Robin Zhang
Hi,air23
JDBCTableSource就是batch模式的,不走实时。Flink解析执行计划时内部会去判断。

Best






air23 wrote
> 这边是想离线读取。不是走实时的 
> 看到异常是 Only insert statement is supported now
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2021-03-30 10:31:51,"guoyb" <

> 861277329@

>> 写道:
>>可以读取的,还有内置flink cdc
>>select得用query方法,看看是不是用错了execute。
>>
>>
>>
>>---原始邮件---
>>发件人: "air23"

> wangfei23_job@

> gt;
> 发送时间: 2021年3月30日(周二) 上午10:25
>>收件人: "user-zh"

> user-zh@.apache

> gt;;
> 主题: flink 从mysql读取数据异常
>>
>>
>>你好 参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
>>这边读取mysql jdbc数据报错Exception in thread "main"
org.apache.flink.table.api.TableException: Only insert statement is
supported now.
>>
>>
>>String a = "-- register a MySQL table 'users' in Flink SQL\n" +
>>"CREATE TABLE MyUserTable (\n" +
>>" id BIGINT\n" +
>>") WITH (\n" +
>>" 'connector' = 'jdbc',\n" +
>>" 'url' = 'jdbc:mysql://***:3306/monitor',\n" +
>>" 'table-name' = 't1',\n" +
>>" 'username' = 'root',\n" +
>>" 'password' = '***'\n" +
>>") ";
>>
>>String b ="-- scan data from the JDBC table\n" +
>>"SELECT id FROM MyUserTable\n";
>>
>>tEnv.executeSql(a);
>>
>>
>>
>>请问是不可以从mysql读取数据吗?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-提交jar 隔断时间自己重启问题

2021-03-30 Thread valve
我也遇到这个问题 不知道为啥



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-30 Thread 张保淇
退订

Restoring from Flink Savepoint in Kubernetes not working

2021-03-30 Thread Claude M
Hello,

I have Flink setup as an Application Cluster in Kubernetes, using Flink
version 1.12.  I created a savepoint using the curl command and the status
indicated it was completed.  I then tried to relaunch the job from that
save point using the following arguments as indicated in the doc found
here:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes

args: ["standalone-job", "--job-classname", "", "--job-id",
"", "--fromSavepoint", "s3:///",
"--allowNonRestoredState"]

After the job launches, I check the offsets and they are not the same as
when the savepoint was created.  The job id passed in also does not match
the job id that was launched.  I even put an incorrect savepoint path to
see what happens and there were no errors in the logs and the job still
launches.  It seems these arguments are not even being evaluated.  Any
ideas about this?


Thanks


Checkpoint Aligned问题

2021-03-30 Thread 张韩



Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-30 Thread Yang Wang
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared
> above, I should be able to ping 144.25.13.78, why I still can NOT ping such
> address?


I think this is a environment problem. Actually, not every IP address could
be tested with "ping" command. I suggest you to use "telnet
144.25.13.78:8081" to check the network connectivity.

2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my
> laptop’s browser. I am within the company’s VPN and such public load
> balancer should expose the flink Web UI, right? I tried to debug the
> network configuration, but failed to find a reason, could you give me some
> hints?


Just like my above answer, I think you need to check the
network connectivity via "telnet 144.25.13.78:8081". Maybe because the
firewall is not allowed connecting from your local(e.g. your local ip is
not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The
> current manual work of “kubectl exec” into pods is not very reliable.. How
> to automate this process and integrate this CI/CD? Please share some blogs
> there is any, thanks.


I think in production environment, you should have your own deployer, which
will take care of submitting the jobs, list/cancel the jobs. Even the
deployer could help with triggering savepoint and manage the whole
lifecycle of Flink applications. I used to develop a PoC of
native-flink-k8s-operator[1]. It could be a start point of your own
deployer if you want to develop it in JAVA.

[1]. https://github.com/wangyang0918/flink-native-k8s-operator


Best,
Yang

Fuyao Li  于2021年3月31日周三 上午6:37写道:

> Hello Yang,
>
>
>
> Thank you so much for providing me the flink-client.yaml. I was able to
> make some progress. I didn’t realize I should create an new pod
> flink-client to list/cancel jobs. I was trying to do such a thing from my
> local laptop. Maybe that is the reason why it doesn’t work. However, I
> still have several questions.
>
>
>
> I created the deployment based on your flink-client.yaml
>
> For the LoadBalancer mode:
>
>
>
> After apply the cluster role binding yaml below.
>
>
>
>
>
> *# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
>  #
> https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
> 
> *kind: ClusterRole
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
>   namespace: default
>   name: service-reader
> rules:
> - apiGroups: [""]
> *# "" indicates the core API group   *resources: ["services"]
>   verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>
>
>
> And execute the command:
>
> kubectl create clusterrolebinding service-reader-pod
> --clusterrole=service-reader  --serviceaccount=default:default
>
>
>
> I am able to exec in the flink-client pod and list/cancel jobs.
>
>
>
> $ kubectl exec -it flink-client-776886cf4f-9h47f bash
>
> kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future
> version. Use kubectl exec [POD] -- [COMMAND] instead.
>
> root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target
> kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
>
> 2021-03-30 21:53:14,513 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve
> flink cluster my-first-application-cluster successfully, JobManager Web
> Interface: http://144.25.13.78:8081
>
> Waiting for response...
>
> -- Running/Restarting Jobs ---
>
> 24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming
> Java API Skeleton (RUNNING)
>
> --
>
> No scheduled jobs.
>
> root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78
>
> PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.
>
>
>
> ^C
>
> --- 144.25.13.78 ping statistics ---
>
> 31 packets transmitted, 0 received, 100% packet loss, time 772ms
>
>
>
> Question:
>
>1. The flink client is able to list/cancel jobs, based on logs shared
>above, I should be able to ping 144.25.13.78, why I still can NOT ping such
>address?
>2. Why is 144.25.13.78:8081 not accessible from outside, I mean on my
>laptop’s browser. I am within the company’s VPN and such public load
>balancer should expose the flink Web UI, right? I tried to debug the
>network configuration, but failed to find a reason, could you give me some
>hints?
>3. In production, what is the suggested approach to list and cancel
>jobs? The current manual work of “kubectl exec” into pods is not very
>reliable.. How to automate this process and integrate this CI/CD? Please
>share some blogs there is any, thanks.
>
>
>
>
>

Organizing Flink Applications: Mono repo or polyrepo

2021-03-30 Thread Xinbin Huang
Hi community

I am curious about people's experience in structuring Flink applications.
Do you use a mono repo structure (multiple applications in one single repo)
or broken down each application into its own repo?

If possible, can you share some of your thoughts on the pros/cons of each
approach?

Thanks
Bin


Re: Checkpoint fail due to timeout

2021-03-30 Thread Alexey Trenikhun
Hi Piotrek,

I can't reproduce problem anymore, before the problem happened 2-3 times in 
row, I've turned off unaligned checkpoints, now returned unaligned checkpoints 
back, but the problem seems gone for now. When problem happened there was no 
progress on source operators, I thought maybe it was by design, that after 
acknowledgment source doesn't produce anything till checkpoint complete... I 
also have union of kafka sources (~50 partitions each), so maybe it same as [1]

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Source-Operators-Stuck-in-the-requestBufferBuilderBlocking-td42530.html
Apache Flink User Mailing List archive. - Source Operators Stuck in the 
requestBufferBuilderBlocking
Source Operators Stuck in the requestBufferBuilderBlocking. Hi, I keep seeing 
the following situation where a task is blocked getting a MemorySegment from 
the pool but the operator is still...
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com


From: Piotr Nowojski 
Sent: Tuesday, March 23, 2021 5:31 AM
To: Alexey Trenikhun 
Cc: Arvid Heise ; ChangZhuo Chen (陳昌倬) ; 
ro...@apache.org ; Flink User Mail List 

Subject: Re: Checkpoint fail due to timeout

Hi Alexey,

You should definitely investigate why the job is stuck.
1. First of all, is it completely stuck, or is something moving? - Use Flink 
metrics [1] (number bytes/records processed), and go through all of the 
operators/tasks to check this.
2. The stack traces like the one you quoted:
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
you can most likely ignore. Such Task ("Legacy Source Thread - Source: 
digital-itx-eastus2 -> Filter (6/6)#0") is backpressured and the problem lies 
downstream.
3. To check what tasks are backpressured, you can also use Flink metrics - 
check "isBackPressured" metric. Again, back pressured tasks are most likely not 
the source of the problem. Check downstream from the back pressured task.
4. First (the most upstream) not backpressured task, which is 
accepting/processing data from some backpressured tasks is the interesting one. 
It's causing backpressure and you need to investigate what is the problem. Take 
a look at it's stack traces, maybe attach a remote profiler and profile it's 
code (if it's making slow progress). Maybe it's stuck in your user code doing 
something.

Please let us know what you have found out.

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

pon., 22 mar 2021 o 19:18 Alexey Trenikhun 
mailto:yen...@msn.com>> napisał(a):
Great! I doubt that it will help in my case however, since in my case even 
unaligned checkpoints “stuck”, in difference with aligned checkpoints, after 
unaligned checkpoint triggered, Flink at some moment become idle, kubernetes 
metrics report very little CPU usage by container, but unaligned checkpoint 
still times out after 3hr.


From: Arvid Heise mailto:ar...@apache.org>>
Sent: Monday, March 22, 2021 6:58:20 AM
To: ChangZhuo Chen (陳昌倬) mailto:czc...@czchen.org>>
Cc: Alexey Trenikhun mailto:yen...@msn.com>>; 
ro...@apache.org 
mailto:ro...@apache.org>>; Flink User Mail List 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint fail due to timeout

Hi Alexey,

rescaling from unaligned checkpoints will be supported with the upcoming 1.13 
release (expected at the end of April).

Best,

Arvid

On Wed, Mar 17, 2021 at 8:29 AM ChangZhuo Chen (陳昌倬) 
mailto:czc...@czchen.org>> wrote:
On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? 
> I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
> rescale I need to take savepoint, which never completes (at least takes 
> longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


Re: Checkpoint fail due to timeout

2021-03-30 Thread Alexey Trenikhun
I also expected improve of checkpointing at the cost of throughput, but in in 
reality I didn't notice difference neither in checkpointing or throughput. 
Backlog was purged by Kafka, so can't post thread dump right now, but I doubt 
that the problem is gone, so will have next chance during next performance run.

Thanks,
Alexey


From: Roman Khachatryan 
Sent: Tuesday, March 23, 2021 12:17 AM
To: Alexey Trenikhun 
Cc: ChangZhuo Chen (陳昌倬) ; Flink User Mail List 

Subject: Re: Checkpoint fail due to timeout

Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x2af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun  wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on 
> each record, it inverted behavior - now Legacy Source thread waits for 
> checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on 
> java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 
> BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> 
> Filter (6/6)#0" Id=199
> at 
> com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
> -  blocked on java.lang.Object@2af646cc
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) 
> Cc: Alexey Trenikhun ; Flink User Mail List 
> 
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬)  
> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> > > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> > > digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > > -  blocked on java.lang.Object@5366a0e2
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > > at 
> > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > > at 
> > > 

Re: Source Operators Stuck in the requestBufferBuilderBlocking

2021-03-30 Thread Sihan You
Awesome. Let me know if you need any other information. Our application has a 
heavy usage on event timer and keyed state. The load is vey heavy. If that 
matters.
On Mar 29, 2021, 05:50 -0700, Piotr Nowojski , wrote:
> Hi Sihan,
>
> Thanks for the information. Previously I was not able to reproduce this 
> issue, but after adding a union I think I can see it happening.
>
> Best,
> Piotrek
>
> > pt., 26 mar 2021 o 22:59 Sihan You  napisał(a):
> > > this issue not always reproducible. it happened 2~3 times in our 
> > > development period of 3 months.
> > >
> > > > On Fri, Mar 26, 2021 at 2:57 PM Sihan You  wrote:
> > > > > Hi,
> > > > >
> > > > > Thanks for responding. I'm working in a commercial organization so I 
> > > > > cannot share the detailed stack with you. I will try to describe the 
> > > > > issue as specific as I can.
> > > > > 
> > > > > above is a more detailed stats of our job.
> > > > > 1. How long did the job run until it got stuck?
> > > > > about 9 hours.
> > > > > 2. How often do you checkpoint or how many checkpoints succeeded?
> > > > > I don't remember the exact number of the successful checkpoints, but 
> > > > > there should be around 2. then the checkpoint started to fail because 
> > > > > of the timeout.
> > > > > 3. What were the typical checkpoint sizes? How much in-flight data 
> > > > > was checkpointed? (A screenshot of the checkpoint tab in the Flink UI 
> > > > > would suffice)
> > > > > the first checkpoint is 5T and the second is 578G.
> > > > > 4. Was the parallelism of the whole job 5? How is the topology 
> > > > > roughly looking? (e.g., Source -> Map -> Sink?)
> > > > > the source is a union of two source streams. one has a parallelism of 
> > > > > 5 and the other has 80.
> > > > > the job graph is like this.
> > > > > source 1.1 (5 parallelism).  ->
> > > > >                                                   union ->
> > > > > source 1.2 (80 parallelism) ->
> > > > >                                                                     
> > > > > connect -> sink
> > > > > source 2.1 (5 parallelism).  ->
> > > > >                                                   union ->
> > > > > source 2.2 (80 parallelism) ->
> > > > > 5. Did you see any warns/errors in the logs related to checkpointing 
> > > > > and I/O?
> > > > > no error is thrown.
> > > > > 6. What was your checkpoint storage (e.g. S3)? Is the application 
> > > > > running in the same data-center (e.g. AWS)?
> > > > > we are using HDFS as the state backend and the checkpoint dir.
> > > > > the application is running in our own data center and in Kubernetes 
> > > > > as a standalone job.
> > > > >
> > > > > > On Fri, Mar 26, 2021 at 7:31 AM Piotr Nowojski 
> > > > > >  wrote:
> > > > > > > Hi Sihan,
> > > > > > >
> > > > > > > More importantly, could you create some example job that can 
> > > > > > > reproduce that problem? It can have some fake sources and no 
> > > > > > > business logic, but if you could provide us with something like 
> > > > > > > that, it would allow us to analyse the problem without going back 
> > > > > > > and forth with tens of questions.
> > > > > > >
> > > > > > > Best, Piotrek
> > > > > > >
> > > > > > > > pt., 26 mar 2021 o 11:40 Arvid Heise  
> > > > > > > > napisał(a):
> > > > > > > > > Hi Sihan,
> > > > > > > > >
> > > > > > > > > thanks for reporting. This looks like a bug to me. I have 
> > > > > > > > > opened an investigation ticket with the highest priority [1].
> > > > > > > > >
> > > > > > > > > Could you please provide some more context, so we have a 
> > > > > > > > > chance to reproduce?
> > > > > > > > > 1. How long did the job run until it got stuck?
> > > > > > > > > 2. How often do you checkpoint or how many checkpoints 
> > > > > > > > > succeeded?
> > > > > > > > > 3. What were the typical checkpoint sizes? How much in-flight 
> > > > > > > > > data was checkpointed? (A screenshot of the checkpoint tab in 
> > > > > > > > > the Flink UI would suffice)
> > > > > > > > > 4. Was the parallelism of the whole job 5? How is the 
> > > > > > > > > topology roughly looking? (e.g., Source -> Map -> Sink?)
> > > > > > > > > 5. Did you see any warns/errors in the logs related to 
> > > > > > > > > checkpointing and I/O?
> > > > > > > > > 6. What was your checkpoint storage (e.g. S3)? Is the 
> > > > > > > > > application running in the same data-center (e.g. AWS)?
> > > > > > > > >
> > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21992
> > > > > > > > >
> > > > > > > > > > On Thu, Mar 25, 2021 at 3:00 AM Sihan You 
> > > > > > > > > >  wrote:
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I keep seeing the following situation where a task is 
> > > > > > > > > > > blocked getting a MemorySegment from the pool but the 
> > > > > > > > > > > operator is still reporting.
> > > > > > > > > > >
> > > > > > > > > > > I'm completely stumped as to how to debug or what to look 
> > > > > > > > > > > at next so any hints/help/advice 

Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-03-30 Thread Fuyao Li
Hello Yang,

Thank you so much for providing me the flink-client.yaml. I was able to make 
some progress. I didn’t realize I should create an new pod flink-client to 
list/cancel jobs. I was trying to do such a thing from my local laptop. Maybe 
that is the reason why it doesn’t work. However, I still have several questions.

I created the deployment based on your flink-client.yaml
For the LoadBalancer mode:

After apply the cluster role binding yaml below.

# https://kubernetes.io/docs/reference/access-authn-authz/rbac/
# 
https://stackoverflow.com/questions/47973570/kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: default
  name: service-reader
rules:
- apiGroups: [""] # "" indicates the core API group
  resources: ["services"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

And execute the command:
kubectl create clusterrolebinding service-reader-pod  
--clusterrole=service-reader  --serviceaccount=default:default

I am able to exec in the flink-client pod and list/cancel jobs.

$ kubectl exec -it flink-client-776886cf4f-9h47f bash
kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future 
version. Use kubectl exec [POD] -- [COMMAND] instead.
root@flink-client-776886cf4f-9h47f:/opt/flink# ./bin/flink list --target 
kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
2021-03-30 21:53:14,513 INFO  
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
flink cluster my-first-application-cluster successfully, JobManager Web 
Interface: http://144.25.13.78:8081
Waiting for response...
-- Running/Restarting Jobs ---
24.03.2021 00:13:04 : eea39629a1931b67eb395207739455ce : Flink Streaming Java 
API Skeleton (RUNNING)
--
No scheduled jobs.
root@flink-client-776886cf4f-9h47f:/opt/flink# ping 144.25.13.78
PING 144.25.13.78 (144.25.13.78) 56(84) bytes of data.

^C
--- 144.25.13.78 ping statistics ---
31 packets transmitted, 0 received, 100% packet loss, time 772ms

Question:

  1.  The flink client is able to list/cancel jobs, based on logs shared above, 
I should be able to ping 144.25.13.78, why I still can NOT ping such address?
  2.  Why is 144.25.13.78:8081 not accessible from outside, I mean on my 
laptop’s browser. I am within the company’s VPN and such public load balancer 
should expose the flink Web UI, right? I tried to debug the network 
configuration, but failed to find a reason, could you give me some hints?
  3.  In production, what is the suggested approach to list and cancel jobs? 
The current manual work of “kubectl exec” into pods is not very reliable.. How 
to automate this process and integrate this CI/CD? Please share some blogs 
there is any, thanks.


Best,
Fuyao

From: Yang Wang 
Date: Monday, March 29, 2021 at 20:40
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for trying the native Kubernetes integration.

Just like you know, the Flink rest service could be exposed in following three 
types, configured via "kubernetes.rest-service.exposed.type".

* ClusterIP, which means you could only access the Flink rest endpoint inside 
the K8s cluster. Simply, users could start a Flink client in the
K8s cluster via the following yaml file. And use "kubectl exec" to tunnel in 
the pod to create a Flink session/application cluster. Also the
"flink list/cancel" could work well.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-client
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink-client
  template:
metadata:
  labels:
app: flink-client
spec:
  containers:
  - name: client
image: flink:1.12.2
imagePullPolicy: Always
args: ["sleep", "86400"]

* NodePort
Currently, we have a limitation that only the Kubernetes master nodes could be 
used to build the Flink exposed rest endpoint. So if your
APIServer node does not have the kube proxy, then the printed URL in the Flink 
client logs could not be used. We already have a ticket[1] to
support one of the slave nodes for accessing the rest endpoint. But I have not 
managed myself to get it done.

* LoadBalancer
Is the resolved rest endpoint 
"http://144.25.13.78:8081/"
 accessible on your Flink client side? If it is yes, then I think the Flink 
client
should be able to contact to JobManager rest server to list/cancel the jobs. I 
have verified in Alibaba container service, and it works well.


[1]. 

Re: SP with Drain and Cancel hangs after take a SP

2021-03-30 Thread Vishal Santoshi
Great, thanks!

On Tue, Mar 30, 2021 at 11:00 AM Till Rohrmann  wrote:

> This is a good idea. I will add it to the section here [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#terminating-a-job
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 2:46 PM Vishal Santoshi 
> wrote:
>
>> Got it. Is it possible to add this very important note to the
>> documentation. Our case is the former as in this is an infinite pipeline
>> and we were establishing the CiCD release process when non breaking
>> changes ( DAG compatible changes are made ) on a running pipe.
>>
>> Regards
>>
>> On Tue, Mar 30, 2021 at 8:14 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> The difference between stop-with-savepoint and
>>> stop-with-savepoint-with-drain is that the latter emits a max watermark
>>> before taking the snapshot. The idea is to trigger all pending timers and
>>> flush the content of some buffering operations like windowing.
>>> Semantically, you should use the first option if you want to stop the job
>>> and resume it at a later point in time. Stop-with-savepoint-with-drain
>>> should only be used if you want to terminate your job and don't intend to
>>> resume it because the max watermark destroys the correctness of results
>>> which are generated after the job is resumed.
>>>
>>> For the concrete problem at hand it is difficult to say why it does not
>>> stop. It would be helpful if you could provide us with the debug logs of
>>> such a run. I am also pulling Arvid who works on Flink's connector
>>> ecosystem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Mar 29, 2021 at 11:08 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 More interested whether a  StreamingFileSink without a drain
 negatively affects it's exactly-once semantics , given that I state on SP
 would have the offsets from kafka + the valid lengths of the part files at
 SP.  To be honest not sure whether the flushed buffers on sink are included
 in the length, or this is not an issue with StreamingFileSink. If it is the
 former then I would assume we should be documented and then have to look
 why this hang happens.

 On Mon, Mar 29, 2021 at 4:08 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Is this a known issue. We do a stop + savepoint with drain. I see no
> back pressure on our operators. It essentially takes a SP and then the 
> SInk
> ( StreamingFileSink to S3 ) just stays in the RUNNING state.
>
> Without drain i stop + savepoint works fine.  I would imagine drain is
> important ( flush the buffers etc  ) but why this hang ( I did it 3 times
> and waited 15 minutes each time ).
>
> Regards.
>



IO benchmarking

2021-03-30 Thread deepthi Sridharan
Hi,

I am trying to set up some benchmarking with a couple of IO options for
saving checkpoints and have a couple of questions :

1. Does flink come with any IO benchmarking tools? I couldn't find any. I
was hoping to use those to derive some insights about the storage
performance and extrapolate it for the checkpoint use case.

2. Are there any metrics pertaining to restore from checkpoints? The only
metric I can find is the last restore time, but neither the time it took to
read the checkpoints, nor the time it took to restore the operator/task
states seem to be covered. I am using RocksDB, but couldn't find any
metrics relating to how much time it took to restore the state backend from
rocksdb either.

3. I am trying to find documentation on how the states are serialized into
the checkpoint files from multiple operators and tasks to tailor the
testing use case, but can't seem to find any. Are there any bogs that go
into this detail or would reading the code be the only option?

--
Thanks,
Deepthi


Proper way to get DataStream

2021-03-30 Thread Maminspapin
Hi,

I'm trying to solve a task with getting data from topic. This topic keeps
avro format data.

I wrote next code:   

 public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

Schema schema = ReflectData.get().getSchema(User.class);
FlinkKafkaConsumer userConsumer = new
FlinkKafkaConsumer<>(
   "test_topic",
*// First* 
AvroDeserializationSchema.forGeneric(schema),
*// Second*
//
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081;),
getConsumerProperties());

DataStream userStream =
env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
userStream.print("users");

env.execute();
}

So, as I think right, there are two ways to get the result:
1. AvroDeserializationSchema.forGeneric(schema)
2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081;)

And I use ReflectData.get().getSchema(User.class) to get schema.


Please, Flink guru, tell me if I am on the right way or not.


If I use First way, there is next error:

java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:150)
at 
org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)

If I use Second way, there is next error:

Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
expecting cep.model.User, missing required field userId
at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)

How can I get the correct result?

Sorry, if duplicated:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html

Today is third day I'm working with this issue (((




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Support for sending generic class

2021-03-30 Thread Le Xu
Hi Gordon and Till:

Thanks for pointing me to the new version! The code I'm using is for a
research project so it's not on any production deadline. However I do like
to know any upcoming updates so there won't be any duplicated works. Couple
of questions I have now:
1. Does 3.0 support context.send(customized) from DataStream interface? If
I implement a statefun application in DataStream API, is there an example
on how to specify customized user types for Datastream application?
2. I noticed that the 3.0 version supports async operation as specified in
this
.
In the older version I noticed that the async operation is done by
registerAsyncOperation which ensures the function continuation is still
executed by the Flink worker. It seems in the new version I can simply use
Java's standard async API. Does Statefun still ensure everything within the
whenComplete.() scope is executed as a Statefun message? (Let's say I uses
a separate thread pool as a storage client).

Thanks for the help!

Le


On Tue, Mar 30, 2021 at 8:39 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Le,
>
> Thanks for reaching out with this question! It's actually a good segue to
> allow me to introduce you to StateFun 3.0.0 :)
>
> StateFun 3.0+ comes with a new type system that would eliminate this
> hassle. You can take a sneak peek here [1].
> This is part 1 of a series of tutorials on fundamentals on the upcoming
> new Java SDK (you can find tutorials for other languages there as well),
> and it guides you through a bit on the new type system.
>
> For your specific case, what you would do is implement a `Type` for your
> Tuple3 messages. The `Type` contains information including a typename to
> identify the data type, and a serializer for de-/serializing the data.
> This `Type` can then be used when creating messages to be sent to other
> functions and egresses, or used as the type specification for persisted
> state values.
>
> If you're not in production usage already, I would highly suggest waiting
> a bit for StateFun 3.0.0 as it is just around the corner with an ongoing
> release candidate vote [2] and is expected to be available within 1-2 weeks.
>
> Let me know if this helps!
>
> Cheers,
> Gordon
>
> [1]
> https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html
>
> On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann 
> wrote:
>
>> Hi Le,
>>
>> I am pulling in Gordon who might be able to help you with your question.
>>
>> Looking at the interface Context, it looks that you cannot easily specify
>> a TypeHint for the message you want to send. Hence, I guess that you
>> explicitly need to register these types.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 8:20 AM Le Xu  wrote:
>>
>>> Hello!
>>>
>>> I'm trying to figure out whether Flink Statefun supports sending object
>>> with class that has generic parameter types (and potentially nested types).
>>> For example, I send a message that looks like this:
>>>
>>> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
>>> listOfLongObject, Long));
>>>
>>> And obviously I'm getting complaints like this:
>>>
>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
>>> TypeInformation from Class alone, because generic parameters are missing.
>>> Please use TypeInformation.of(TypeHint) instead, or another equivalent
>>> method in the API that accepts a TypeHint instead of a Class. For example
>>> for a Tuple2 pass a 'new TypeHint>(){}'.
>>> at
>>> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
>>> at
>>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
>>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>> at
>>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
>>> at
>>> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
>>> at
>>> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
>>> at
>>> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
>>> at
>>> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
>>> at
>>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
>>> at
>>> 

Re: Scala : ClassCastException with Kafka Connector and ObjectNode

2021-03-30 Thread Lehuede sebastien
Hi Till,

That solved my issue ! Many many thanks for the solution and for the useful 
StackOverflow link ! ☺️

Cheers,
Sébastien 

> Le 30 mars 2021 à 18:16, Till Rohrmann  a écrit :
> 
> Hi Sebastien,
> 
> I think the Scala compiler infers the most specific type for deepCopy() which 
> is Nothing (Nothing is the subtype of every type) [1] because you haven't 
> specified a type here. In order to make it work you have to specify the 
> concrete type:
> 
> event.get("value").deepCopy[ObjectNode]()
> 
> [1] https://stackoverflow.com/a/18008591/4815083 
> 
> 
> Cheers,
> Till
> 
> On Tue, Mar 30, 2021 at 3:45 PM Lehuede sebastien  > wrote:
> Hi all,
> 
> I’m currently trying to use Scala to setup a simple Kafka consumer that 
> receive JSON formatted events and then just send them to Elasticsearch. This 
> is the first step and after I want to add some processing logic. 
> 
> My code works well but interesting fields form my JSON formatted events are 
> under the key « value ». So I tried to use a map function to get fields under 
> « value » and send them to ES.
> 
> But when I try to use this map function I always get a ClassCastException 
> error. 
> 
> Additional Information :
> 
> • My Job run on a Flink Kubernetes Application cluster with a standalone job.
> • If I remove the map function, everything is working fine and I can find my 
> events in Elasticsearch
> • I tried to replace to ‘JSONKeyValueDeserializationSchema’ by 
> ’SimpleStringSchema’ and then use a mapper in my map function but I still 
> have the same issue.
> • I’m using Scala version 2.12.12
> 
> Here is my code : 
> 
> 
> 
> // Create the Flink Kafka connector
> val flinkKafkaConsumer = new FlinkKafkaConsumer(topics, new 
> JSONKeyValueDeserializationSchema(false), kafkaProperties)
> flinkKafkaConsumer.setStartFromEarliest()
> 
> val eventsStream: DataStream[ObjectNode] = env.addSource(flinkKafkaConsumer)
>   .name("Event Stream : Kafka consumer")
>   .map(event => event.get("value").deepCopy())
> 
> eventsStream.map(_.toString).addSink(esSinkBuilder.build()).name("Event 
> Stream : Elasticsearch Stream")
> 
> // execute program
> env.execute(« Kafka to ES")
> 
> 
> 
> Here is the error : 
> 
> [nybble-jobmanager] org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> [nybble-jobmanager]at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> [nybble-jobmanager]at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
> [nybble-jobmanager]at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
> ~[?:?]
> [nybble-jobmanager]at java.lang.reflect.Method.invoke(Unknown Source) 
> ~[?:?]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> 

Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Yik San Chan
Hi Till,

>From the version I am using (1.12.0), getFieldNames is not available in Row
... See
https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
.

Is there any workaround for this in version 1.12.0? Thanks.

Best,
Yik San

On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann  wrote:

> There is a method Row.getFieldNames.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
> wrote:
>
>> Hi Till,
>>
>> I look inside the Row class, it does contain a member `private final
>> Object[] fields;` though I wonder how to get column names out of the
>> member?
>>
>> Thanks!
>>
>> Best,
>> Yik San
>>
>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Yik San,
>>>
>>> by converting the rows to a Tuple3 you effectively lose the information
>>> about the column names. You could also call `toRetractStream[Row]` which
>>> will give you a `DataStream[Row]` where you keep the column names.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
>>> wrote:
>>>
 The question is cross-posted on Stack Overflow
 https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
 .

 I want to consume a Kafka topic into a table using Flink SQL, then
 convert it back to a DataStream.

 Here is the `SOURCE_DDL`:

 ```
 CREATE TABLE kafka_source (
 user_id BIGINT,
 datetime TIMESTAMP(3),
 last_5_clicks STRING
 ) WITH (
 'connector' = 'kafka',
 'topic' = 'aiinfra.fct.userfeature.0',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'test-group',
 'format' = 'json'
 )
 ```

 With Flink, I execute the DDL.

 ```scala
 val settings = EnvironmentSettings.newInstance.build
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
 tableEnv.executeSql(SOURCE_DDL)
 val table = tableEnv.from("kafka_source")
 ```

 Then, I convert it into DataStream, and do downstream logic in the
 `map(e => ...)` part.

 ```scala
 tableEnv.toRetractStream[(Long, java.sql.Timestamp,
 String)](table).map(e => ...)
 ```

 Inside the `map(e => ...)` part, I would like to access the column
 name, in this case, `last_5_clicks`. Why? Because I may have different
 sources with different columns names (such as `last_10min_page_view`), but
 I would like to reuse the code in `map(e => ...)`.

 Is there a way to do this? Thanks.

 Best,
 Yik San

>>>


Re: Failure detection in Flink

2021-03-30 Thread Sonam Mandal
Hi Till,

This is really helpful, thanks for the detailed explanation about what happens.
I'll reach out again if Ihave any further questions. For now I'm just trying to 
understand the various failure scenarios and how they are handled by Flink.

Thanks,
Sonam

From: Till Rohrmann 
Sent: Tuesday, March 30, 2021 8:33 AM
To: Sonam Mandal 
Cc: user@flink.apache.org 
Subject: Re: Failure detection in Flink

Well, the FLIP-6 documentation is probably the best resource albeit being a bit 
outdated.

The components react a bit differently:

JobMaster loses heartbeat with a TaskExecutor: If this happens, then the 
JobMaster will invalidate all slots from this TaskExecutor. This will then fail 
the tasks which have been deployed into these slots. This will then trigger a 
recovery of the affected pipelined region.

TaskExecutor loses heartbeat with a JobMaster: The TaskExecutor will fail all 
currently running tasks belonging to the timed out JobMaster. Moreover, it will 
release all intermediate result partitions it still keeps for this job. The 
slots for this JobMaster will transition to an inactive state. In this state, 
the TaskExecutor will try to reconnect to the JobMaster in order to offer ths 
slots. If this is not successful within a configurable timeout, these slots 
will be freed and returned to the ResourceManager.

JobMaster loses heartbeat with the ResourceManager: The JobMaster tries to 
reconnect to the ResourceManager. Until this has happened, the JobMaster cannot 
ask for new slots.

ResourceManager loses heartbeat with the JobMaster: The ResourceManager closes 
the connection to the JobMaster. Moreover, it registers a timeout until when a 
JobMaster needs to reconnect to it. If this does not happen, then the 
ResourceManager will clear the declared resources for the job and cleans up the 
internal bookkeeping data structures.

I hope this helps a bit to better understand the failover behavior.

If you want to know something in particular, then let me know.

Cheers,
Till

On Tue, Mar 30, 2021 at 4:13 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help 
to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the 
behavior of various components when failures are detected via heartbeat 
timeouts etc. is this the best reference on how Flink reacts to such failure 
scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam

Get Outlook for 
iOS

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal mailto:soman...@linkedin.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Failure detection in Flink

Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. 
This mechanism is independent of the used deployment model. The relevant 
configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka 
system. Since we are using TCP Akka's failure detector is not needed [2]. I 
think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism 
because in some deployment scenarios we have additional signals available which 
could help us with the detection. But so far we haven't made a lot of progress 
in this area.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#advanced-fault-tolerance-options
[2] 

Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Till Rohrmann
There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
wrote:

> Hi Till,
>
> I look inside the Row class, it does contain a member `private final
> Object[] fields;` though I wonder how to get column names out of the
> member?
>
> Thanks!
>
> Best,
> Yik San
>
> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
> wrote:
>
>> Hi Yik San,
>>
>> by converting the rows to a Tuple3 you effectively lose the information
>> about the column names. You could also call `toRetractStream[Row]` which
>> will give you a `DataStream[Row]` where you keep the column names.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
>> wrote:
>>
>>> The question is cross-posted on Stack Overflow
>>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>>> .
>>>
>>> I want to consume a Kafka topic into a table using Flink SQL, then
>>> convert it back to a DataStream.
>>>
>>> Here is the `SOURCE_DDL`:
>>>
>>> ```
>>> CREATE TABLE kafka_source (
>>> user_id BIGINT,
>>> datetime TIMESTAMP(3),
>>> last_5_clicks STRING
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'aiinfra.fct.userfeature.0',
>>> 'properties.bootstrap.servers' = 'localhost:9092',
>>> 'properties.group.id' = 'test-group',
>>> 'format' = 'json'
>>> )
>>> ```
>>>
>>> With Flink, I execute the DDL.
>>>
>>> ```scala
>>> val settings = EnvironmentSettings.newInstance.build
>>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>>> tableEnv.executeSql(SOURCE_DDL)
>>> val table = tableEnv.from("kafka_source")
>>> ```
>>>
>>> Then, I convert it into DataStream, and do downstream logic in the
>>> `map(e => ...)` part.
>>>
>>> ```scala
>>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>>> String)](table).map(e => ...)
>>> ```
>>>
>>> Inside the `map(e => ...)` part, I would like to access the column name,
>>> in this case, `last_5_clicks`. Why? Because I may have different sources
>>> with different columns names (such as `last_10min_page_view`), but I would
>>> like to reuse the code in `map(e => ...)`.
>>>
>>> Is there a way to do this? Thanks.
>>>
>>> Best,
>>> Yik San
>>>
>>


Re: Scala : ClassCastException with Kafka Connector and ObjectNode

2021-03-30 Thread Till Rohrmann
Hi Sebastien,

I think the Scala compiler infers the most specific type for
deepCopy() which is Nothing (Nothing is the subtype of every type) [1]
because you haven't specified a type here. In order to make it work you
have to specify the concrete type:

event.get("value").deepCopy[ObjectNode]()

[1] https://stackoverflow.com/a/18008591/4815083

Cheers,
Till

On Tue, Mar 30, 2021 at 3:45 PM Lehuede sebastien 
wrote:

> Hi all,
>
> I’m currently trying to use Scala to setup a simple Kafka consumer that
> receive JSON formatted events and then just send them to Elasticsearch.
> This is the first step and after I want to add some processing logic.
>
> My code works well but interesting fields form my JSON formatted events
> are under the key « value ». So I tried to use a map function to get fields
> under « value » and send them to ES.
>
> But when I try to use this map function I always get a ClassCastException
> error.
>
> Additional Information :
>
> • My Job run on a Flink Kubernetes Application cluster with a standalone
> job.
> • If I remove the map function, everything is working fine and I can find
> my events in Elasticsearch
> • I tried to replace to ‘JSONKeyValueDeserializationSchema’ by
> ’SimpleStringSchema’ and then use a mapper in my map function but I still
> have the same issue.
> • I’m using Scala version 2.12.12
>
> Here is my code :
>
>
>
> // Create the Flink Kafka connector
> val flinkKafkaConsumer = new FlinkKafkaConsumer(topics, new 
> JSONKeyValueDeserializationSchema(false), kafkaProperties)
> flinkKafkaConsumer.setStartFromEarliest()
>
> val eventsStream: DataStream[ObjectNode] = env.addSource(flinkKafkaConsumer)
>   .name("Event Stream : Kafka consumer")
>   .map(event => event.get("value").deepCopy())
>
> eventsStream.map(_.toString).addSink(esSinkBuilder.build()).name("Event 
> Stream : Elasticsearch Stream")
>
> // execute program
> env.execute(« Kafka to ES")
>
>
>
> Here is the error :
>
> [nybble-jobmanager] org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> [nybble-jobmanager]at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> [nybble-jobmanager]at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
> [nybble-jobmanager]at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
> ~[?:?]
> [nybble-jobmanager]at java.lang.reflect.Method.invoke(Unknown Source) 
> ~[?:?]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>  ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
> [nybble-jobmanager]at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [akka-actor_2.12-2.5.21.jar:2.5.21]
> [nybble-jobmanager]at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [akka-actor_2.12-2.5.21.jar:2.5.21]
> [nybble-jobmanager]at 
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
> [scala-library-2.12.12.jar:?]
> [nybble-jobmanager] 

Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Yik San Chan
Hi Till,

I look inside the Row class, it does contain a member `private final Object[]
fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann  wrote:

> Hi Yik San,
>
> by converting the rows to a Tuple3 you effectively lose the information
> about the column names. You could also call `toRetractStream[Row]` which
> will give you a `DataStream[Row]` where you keep the column names.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>> .
>>
>> I want to consume a Kafka topic into a table using Flink SQL, then
>> convert it back to a DataStream.
>>
>> Here is the `SOURCE_DDL`:
>>
>> ```
>> CREATE TABLE kafka_source (
>> user_id BIGINT,
>> datetime TIMESTAMP(3),
>> last_5_clicks STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'aiinfra.fct.userfeature.0',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'properties.group.id' = 'test-group',
>> 'format' = 'json'
>> )
>> ```
>>
>> With Flink, I execute the DDL.
>>
>> ```scala
>> val settings = EnvironmentSettings.newInstance.build
>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>> tableEnv.executeSql(SOURCE_DDL)
>> val table = tableEnv.from("kafka_source")
>> ```
>>
>> Then, I convert it into DataStream, and do downstream logic in the `map(e
>> => ...)` part.
>>
>> ```scala
>> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
>> => ...)
>> ```
>>
>> Inside the `map(e => ...)` part, I would like to access the column name,
>> in this case, `last_5_clicks`. Why? Because I may have different sources
>> with different columns names (such as `last_10min_page_view`), but I would
>> like to reuse the code in `map(e => ...)`.
>>
>> Is there a way to do this? Thanks.
>>
>> Best,
>> Yik San
>>
>


Re: Flink Table to DataStream: how to access column name?

2021-03-30 Thread Till Rohrmann
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information
about the column names. You could also call `toRetractStream[Row]` which
will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
> .
>
> I want to consume a Kafka topic into a table using Flink SQL, then convert
> it back to a DataStream.
>
> Here is the `SOURCE_DDL`:
>
> ```
> CREATE TABLE kafka_source (
> user_id BIGINT,
> datetime TIMESTAMP(3),
> last_5_clicks STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'aiinfra.fct.userfeature.0',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'test-group',
> 'format' = 'json'
> )
> ```
>
> With Flink, I execute the DDL.
>
> ```scala
> val settings = EnvironmentSettings.newInstance.build
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
> tableEnv.executeSql(SOURCE_DDL)
> val table = tableEnv.from("kafka_source")
> ```
>
> Then, I convert it into DataStream, and do downstream logic in the `map(e
> => ...)` part.
>
> ```scala
> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
> => ...)
> ```
>
> Inside the `map(e => ...)` part, I would like to access the column name,
> in this case, `last_5_clicks`. Why? Because I may have different sources
> with different columns names (such as `last_10min_page_view`), but I would
> like to reuse the code in `map(e => ...)`.
>
> Is there a way to do this? Thanks.
>
> Best,
> Yik San
>


Re: Failure detection in Flink

2021-03-30 Thread Till Rohrmann
Well, the FLIP-6 documentation is probably the best resource albeit being a
bit outdated.

The components react a bit differently:

JobMaster loses heartbeat with a TaskExecutor: If this happens, then the
JobMaster will invalidate all slots from this TaskExecutor. This will then
fail the tasks which have been deployed into these slots. This will then
trigger a recovery of the affected pipelined region.

TaskExecutor loses heartbeat with a JobMaster: The TaskExecutor will fail
all currently running tasks belonging to the timed out JobMaster. Moreover,
it will release all intermediate result partitions it still keeps for this
job. The slots for this JobMaster will transition to an inactive state. In
this state, the TaskExecutor will try to reconnect to the JobMaster in
order to offer ths slots. If this is not successful within a configurable
timeout, these slots will be freed and returned to the ResourceManager.

JobMaster loses heartbeat with the ResourceManager: The JobMaster tries to
reconnect to the ResourceManager. Until this has happened, the JobMaster
cannot ask for new slots.

ResourceManager loses heartbeat with the JobMaster: The ResourceManager
closes the connection to the JobMaster. Moreover, it registers a timeout
until when a JobMaster needs to reconnect to it. If this does not happen,
then the ResourceManager will clear the declared resources for the job and
cleans up the internal bookkeeping data structures.

I hope this helps a bit to better understand the failover behavior.

If you want to know something in particular, then let me know.

Cheers,
Till

On Tue, Mar 30, 2021 at 4:13 PM Sonam Mandal  wrote:

> Hi Till,
>
> Thanks, this helps! Yes, removing the AKKA related configs will definitely
> help to reduce confusion.
>
> One more question, I was going through FLIP-6 and it does talk about the
> behavior of various components when failures are detected via heartbeat
> timeouts etc. is this the best reference on how Flink reacts to such
> failure scenarios? If not, can you provide some details on how this works?
>
> Thanks,
> Sonam
>
> Get Outlook for iOS 
> --
> *From:* Till Rohrmann 
> *Sent:* Tuesday, March 30, 2021 5:02:43 AM
> *To:* Sonam Mandal 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Failure detection in Flink
>
> Hi Sonam,
>
> Flink uses its own heartbeat implementation to detect failures of
> components. This mechanism is independent of the used deployment model. The
> relevant configuration options can be found here [1].
>
> The akka.transport.* options are only for configuring the underlying Akka
> system. Since we are using TCP Akka's failure detector is not needed [2]. I
> think we should remove it in order to avoid confusion [3].
>
> The community also thinks about improving the failure detection mechanism
> because in some deployment scenarios we have additional signals available
> which could help us with the detection. But so far we haven't made a lot of
> progress in this area.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#advanced-fault-tolerance-options
> 
> [2]
> https://doc.akka.io/docs/akka-enhancements/current/config-checker.html#transport-failure-detector
> 
> [3] https://issues.apache.org/jira/browse/FLINK-22048
> 
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal 
> wrote:
>
> Hello,
>
> I'm looking for some resources around failure detection in Flink between
> the various components such as Task Manager, Job Manager, Resource Manager,
> etc. For example, how does the Job Manager detect that a Task Manager is
> down (long GC pause or it just crashed)?
>
> There 

Re: StateFun examples in scala

2021-03-30 Thread jose farfan
Hi

Many thx for your quick answer.
I will review the links.

BR
Jose

On Tue, 30 Mar 2021 at 15:22, Tzu-Li (Gordon) Tai 
wrote:

> Hi Jose!
>
> For Scala, we would suggest to wait until StateFun 3.0.0 is released,
> which is actually happening very soon (likely within 1-2 weeks) as there is
> an ongoing release candidate vote [1].
>
> The reason for this is that version 3.0 adds a remote SDK for Java, which
> you should be able to use with Scala (or any other JVM language) seamlessly.
> With StateFun <= 2.x, you only have the option to use embedded functions
> if you'd like to use Java / Scala, which is a bit problematic and we can't
> guarantee that it'll work for all Scala versions.
>
> You can take a look at a preview of the new Java SDK here [2], this is a
> nice tutorial that runs you through all the SDK fundamentals.
> Note that this is not completely finalized yet, as the release voting is
> still ongoing.
>
> Would be great if you want to try the release candidate out already, or
> have some feedback for the new SDK!
>
> Cheers,
> Gordon
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html
> [2]
> https://github.com/apache/flink-statefun-playground/tree/dev/java/showcase
>
> On Tue, Mar 30, 2021 at 8:21 PM Till Rohrmann 
> wrote:
>
>> Hi Jose,
>>
>> I am pulling in Gordon who will be able to help you with your question.
>>
>> Personally, I am not aware of any limitations which prohibit the usage of
>> Scala.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 11:55 AM jose farfan  wrote:
>>
>>> Hi
>>>
>>> I am trying to find some examples written in scala of StateFun.
>>>
>>> But, I cannot find nothing.
>>> My questions is:
>>>
>>>1. is there any problem to use statefun with Scala
>>>2. is there any place with examples written in scala.
>>>
>>> BR
>>> Jose
>>>
>>


Re: SP with Drain and Cancel hangs after take a SP

2021-03-30 Thread Till Rohrmann
This is a good idea. I will add it to the section here [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#terminating-a-job

Cheers,
Till

On Tue, Mar 30, 2021 at 2:46 PM Vishal Santoshi 
wrote:

> Got it. Is it possible to add this very important note to the
> documentation. Our case is the former as in this is an infinite pipeline
> and we were establishing the CiCD release process when non breaking
> changes ( DAG compatible changes are made ) on a running pipe.
>
> Regards
>
> On Tue, Mar 30, 2021 at 8:14 AM Till Rohrmann 
> wrote:
>
>> Hi Vishal,
>>
>> The difference between stop-with-savepoint and
>> stop-with-savepoint-with-drain is that the latter emits a max watermark
>> before taking the snapshot. The idea is to trigger all pending timers and
>> flush the content of some buffering operations like windowing.
>> Semantically, you should use the first option if you want to stop the job
>> and resume it at a later point in time. Stop-with-savepoint-with-drain
>> should only be used if you want to terminate your job and don't intend to
>> resume it because the max watermark destroys the correctness of results
>> which are generated after the job is resumed.
>>
>> For the concrete problem at hand it is difficult to say why it does not
>> stop. It would be helpful if you could provide us with the debug logs of
>> such a run. I am also pulling Arvid who works on Flink's connector
>> ecosystem.
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 29, 2021 at 11:08 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> More interested whether a  StreamingFileSink without a drain
>>> negatively affects it's exactly-once semantics , given that I state on SP
>>> would have the offsets from kafka + the valid lengths of the part files at
>>> SP.  To be honest not sure whether the flushed buffers on sink are included
>>> in the length, or this is not an issue with StreamingFileSink. If it is the
>>> former then I would assume we should be documented and then have to look
>>> why this hang happens.
>>>
>>> On Mon, Mar 29, 2021 at 4:08 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Is this a known issue. We do a stop + savepoint with drain. I see no
 back pressure on our operators. It essentially takes a SP and then the SInk
 ( StreamingFileSink to S3 ) just stays in the RUNNING state.

 Without drain i stop + savepoint works fine.  I would imagine drain is
 important ( flush the buffers etc  ) but why this hang ( I did it 3 times
 and waited 15 minutes each time ).

 Regards.

>>>


Re: Failure detection in Flink

2021-03-30 Thread Sonam Mandal
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help 
to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the 
behavior of various components when failures are detected via heartbeat 
timeouts etc. is this the best reference on how Flink reacts to such failure 
scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam

Get Outlook for iOS

From: Till Rohrmann 
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal 
Cc: user@flink.apache.org 
Subject: Re: Failure detection in Flink

Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. 
This mechanism is independent of the used deployment model. The relevant 
configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka 
system. Since we are using TCP Akka's failure detector is not needed [2]. I 
think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism 
because in some deployment scenarios we have additional signals available which 
could help us with the detection. But so far we haven't made a lot of progress 
in this area.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#advanced-fault-tolerance-options
[2] 
https://doc.akka.io/docs/akka-enhancements/current/config-checker.html#transport-failure-detector
[3] 
https://issues.apache.org/jira/browse/FLINK-22048

Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

I'm looking for some resources around failure detection in Flink between the 
various components such as Task Manager, Job Manager, Resource Manager, etc. 
For example, how does the Job Manager detect that a Task Manager is down (long 
GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death 
watches or custom heartbeat implementation? Reason I ask is because some 
configurations for timeout are AKKA related, whereas others aren't. I would 
like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though 
this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the 
deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam




Flink Table to DataStream: how to access column name?

2021-03-30 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
.

I want to consume a Kafka topic into a table using Flink SQL, then convert
it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e
=> ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
=> ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in
this case, `last_5_clicks`. Why? Because I may have different sources with
different columns names (such as `last_10min_page_view`), but I would like
to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San


Scala : ClassCastException with Kafka Connector and ObjectNode

2021-03-30 Thread Lehuede sebastien
Hi all,

I’m currently trying to use Scala to setup a simple Kafka consumer that receive 
JSON formatted events and then just send them to Elasticsearch. This is the 
first step and after I want to add some processing logic. 

My code works well but interesting fields form my JSON formatted events are 
under the key « value ». So I tried to use a map function to get fields under « 
value » and send them to ES.

But when I try to use this map function I always get a ClassCastException 
error. 

Additional Information :

• My Job run on a Flink Kubernetes Application cluster with a standalone job.
• If I remove the map function, everything is working fine and I can find my 
events in Elasticsearch
• I tried to replace to ‘JSONKeyValueDeserializationSchema’ by 
’SimpleStringSchema’ and then use a mapper in my map function but I still have 
the same issue.
• I’m using Scala version 2.12.12

Here is my code : 



// Create the Flink Kafka connector
val flinkKafkaConsumer = new FlinkKafkaConsumer(topics, new 
JSONKeyValueDeserializationSchema(false), kafkaProperties)
flinkKafkaConsumer.setStartFromEarliest()

val eventsStream: DataStream[ObjectNode] = env.addSource(flinkKafkaConsumer)
  .name("Event Stream : Kafka consumer")
  .map(event => event.get("value").deepCopy())

eventsStream.map(_.toString).addSink(esSinkBuilder.build()).name("Event Stream 
: Elasticsearch Stream")

// execute program
env.execute(« Kafka to ES")



Here is the error : 

[nybble-jobmanager] org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy
[nybble-jobmanager]at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
[nybble-jobmanager]at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
[nybble-jobmanager]at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
[nybble-jobmanager]at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
[nybble-jobmanager]at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
 ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
[nybble-jobmanager]at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[akka-actor_2.12-2.5.21.jar:2.5.21]
[nybble-jobmanager]at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[akka-actor_2.12-2.5.21.jar:2.5.21]
[nybble-jobmanager]at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[scala-library-2.12.12.jar:?]
[nybble-jobmanager]at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[scala-library-2.12.12.jar:?]
[nybble-jobmanager]at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[akka-actor_2.12-2.5.21.jar:2.5.21]
[nybble-jobmanager]at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[scala-library-2.12.12.jar:?]
[nybble-jobmanager]at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[scala-library-2.12.12.jar:?]
[nybble-jobmanager]at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 

Re: Support for sending generic class

2021-03-30 Thread Tzu-Li (Gordon) Tai
Hi Le,

Thanks for reaching out with this question! It's actually a good segue to
allow me to introduce you to StateFun 3.0.0 :)

StateFun 3.0+ comes with a new type system that would eliminate this
hassle. You can take a sneak peek here [1].
This is part 1 of a series of tutorials on fundamentals on the upcoming new
Java SDK (you can find tutorials for other languages there as well), and it
guides you through a bit on the new type system.

For your specific case, what you would do is implement a `Type` for your
Tuple3 messages. The `Type` contains information including a typename to
identify the data type, and a serializer for de-/serializing the data.
This `Type` can then be used when creating messages to be sent to other
functions and egresses, or used as the type specification for persisted
state values.

If you're not in production usage already, I would highly suggest waiting a
bit for StateFun 3.0.0 as it is just around the corner with an ongoing
release candidate vote [2] and is expected to be available within 1-2 weeks.

Let me know if this helps!

Cheers,
Gordon

[1]
https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html

On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann  wrote:

> Hi Le,
>
> I am pulling in Gordon who might be able to help you with your question.
>
> Looking at the interface Context, it looks that you cannot easily specify
> a TypeHint for the message you want to send. Hence, I guess that you
> explicitly need to register these types.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 8:20 AM Le Xu  wrote:
>
>> Hello!
>>
>> I'm trying to figure out whether Flink Statefun supports sending object
>> with class that has generic parameter types (and potentially nested types).
>> For example, I send a message that looks like this:
>>
>> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
>> listOfLongObject, Long));
>>
>> And obviously I'm getting complaints like this:
>>
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
>> TypeInformation from Class alone, because generic parameters are missing.
>> Please use TypeInformation.of(TypeHint) instead, or another equivalent
>> method in the API that accepts a TypeHint instead of a Class. For example
>> for a Tuple2 pass a 'new TypeHint>(){}'.
>> at
>> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
>> at
>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at
>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
>> at
>> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
>> at
>> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
>> at
>> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
>> at
>> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
>> at
>> org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
>> at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
>> at
>> benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>>
>>
>> Is there any API function that statefun support for parameterized class
>> like this or does the user function need to handle the serialization
>> process -- or is there anyway to quickly modify statefun message interface
>> to support this functionality.
>>
>> Thanks!
>>
>> Le
>>
>>
>>
>>
>>
>>


Re: StateFun examples in scala

2021-03-30 Thread Tzu-Li (Gordon) Tai
Hi Jose!

For Scala, we would suggest to wait until StateFun 3.0.0 is released, which
is actually happening very soon (likely within 1-2 weeks) as there is an
ongoing release candidate vote [1].

The reason for this is that version 3.0 adds a remote SDK for Java, which
you should be able to use with Scala (or any other JVM language) seamlessly.
With StateFun <= 2.x, you only have the option to use embedded functions if
you'd like to use Java / Scala, which is a bit problematic and we can't
guarantee that it'll work for all Scala versions.

You can take a look at a preview of the new Java SDK here [2], this is a
nice tutorial that runs you through all the SDK fundamentals.
Note that this is not completely finalized yet, as the release voting is
still ongoing.

Would be great if you want to try the release candidate out already, or
have some feedback for the new SDK!

Cheers,
Gordon

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html
[2]
https://github.com/apache/flink-statefun-playground/tree/dev/java/showcase

On Tue, Mar 30, 2021 at 8:21 PM Till Rohrmann  wrote:

> Hi Jose,
>
> I am pulling in Gordon who will be able to help you with your question.
>
> Personally, I am not aware of any limitations which prohibit the usage of
> Scala.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 11:55 AM jose farfan  wrote:
>
>> Hi
>>
>> I am trying to find some examples written in scala of StateFun.
>>
>> But, I cannot find nothing.
>> My questions is:
>>
>>1. is there any problem to use statefun with Scala
>>2. is there any place with examples written in scala.
>>
>> BR
>> Jose
>>
>


Re: Evenly distribute task slots across task-manager

2021-03-30 Thread Till Rohrmann
Hi Vignesh,

if I understand you correctly, then you have a job like:

KafkaSources(parallelism = 64) => Mapper(parallelism = 16) => something else

Moreover, you probably have slot sharing enabled which means that a
KafkaSource and a Mapper can be deployed into the same slot.

So what happens before scheduling is that Flink calculates which Mapper can
run together with which KafkaSource. Since the KafkaSource shuffles the
data any Mapper could run with any KafkaSource. So this means that Flink
pairs Mapper_1 with KafkaSource_1, Mapper_2 with KafkaSource_2, ...,
Mapper_16 with KafkaSource_16. Then Flink tries to allocate the required
slots for the topology. When selecting the slots for the KafkaSources, it
tries to evenly spread them out. Since the parallelism of the sources is
equal to the total number of slots in the cluster, this is not really
visible. Since the order in which the KafkaSources are assigned their slots
is not defined, the Mapper_i end up on arbitrary TaskManagers. That's what
you are observing. I think one could improve the situation if Flink tried
to allocate slots starting with subtask index 1, 2, 3, ..., etc.

If you set the parallelism of the KafkaSources to 16, then you should see
an evenly spread allocation of slots.

Cheers,
Till

On Tue, Mar 30, 2021 at 10:05 AM yidan zhao  wrote:

> I think currently flink doesn't support your case, and another idea is
> that you can set the parallelism of all operators to 64, then it will be
> evenly distributed to the two taskmanagers.
>
> Vignesh Ramesh  于2021年3月25日周四 上午1:05写道:
>
>> Hi Matthias,
>>
>> Thanks for your reply. In my case, yes the upstream operator for the
>> operator which is not distributed evenly among task managers is a flink
>> Kafka connector with a rebalance(shuffling).
>>
>> Regards,
>> Vignesh
>>
>> On Tue, 23 Mar, 2021, 6:48 pm Matthias Pohl, 
>> wrote:
>>
>>> There was a similar discussion recently in this mailing list about
>>> distributing the work onto different TaskManagers. One finding Xintong
>>> shared there [1] was that the parameter cluster.evenly-spread-out-slots is
>>> used to evenly allocate slots among TaskManagers but not how the tasks are
>>> actually distributed among the allocated slots. It would be interesting to
>>> know more about your job. If the upstream operator does some shuffling, you
>>> might run into the issue of the task executions not being distributed
>>> evenly anymore.
>>>
>>> Matthias
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evenly-Spreading-Out-Source-Tasks-tp42108p42235.html
>>>
>>> On Tue, Mar 23, 2021 at 1:42 PM Matthias Pohl 
>>> wrote:
>>>
 Hi Vignesh,
 are you trying to achieve an even distribution of tasks for this one
 operator that has the parallelism set to 16? Or do you observe the
 described behavior also on a job level?
 I'm adding Chesnay to the thread as he might have more insights on this
 topic.

 Best,
 Matthias

 On Mon, Mar 22, 2021 at 6:31 PM Vignesh Ramesh 
 wrote:

> Hello Everyone,
>
> Can someone help me with a solution?
>
> I have a flink job(2 task-managers) with a job parallelism of 64 and
> task slot of 64.
> I have a parallelism set for one of the operators as 16. This
> operator(16 parallelism) slots are not getting evenly distributed across
> two task managers. It often takes higher task slots like 10/11 in one task
> manager and 5/6 in other task manager.
>
> I'am using flink version 1.11.2. I tried adding 
> cluster.evenly-spread-out-slots:
> true but it didn't work. Any solution is greatly appreciated.
>
> Thanks in advance,
>
> Regards,
> Vignesh
>
>


Re: SP with Drain and Cancel hangs after take a SP

2021-03-30 Thread Vishal Santoshi
Got it. Is it possible to add this very important note to the
documentation. Our case is the former as in this is an infinite pipeline
and we were establishing the CiCD release process when non breaking
changes ( DAG compatible changes are made ) on a running pipe.

Regards

On Tue, Mar 30, 2021 at 8:14 AM Till Rohrmann  wrote:

> Hi Vishal,
>
> The difference between stop-with-savepoint and
> stop-with-savepoint-with-drain is that the latter emits a max watermark
> before taking the snapshot. The idea is to trigger all pending timers and
> flush the content of some buffering operations like windowing.
> Semantically, you should use the first option if you want to stop the job
> and resume it at a later point in time. Stop-with-savepoint-with-drain
> should only be used if you want to terminate your job and don't intend to
> resume it because the max watermark destroys the correctness of results
> which are generated after the job is resumed.
>
> For the concrete problem at hand it is difficult to say why it does not
> stop. It would be helpful if you could provide us with the debug logs of
> such a run. I am also pulling Arvid who works on Flink's connector
> ecosystem.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 11:08 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> More interested whether a  StreamingFileSink without a drain
>> negatively affects it's exactly-once semantics , given that I state on SP
>> would have the offsets from kafka + the valid lengths of the part files at
>> SP.  To be honest not sure whether the flushed buffers on sink are included
>> in the length, or this is not an issue with StreamingFileSink. If it is the
>> former then I would assume we should be documented and then have to look
>> why this hang happens.
>>
>> On Mon, Mar 29, 2021 at 4:08 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Is this a known issue. We do a stop + savepoint with drain. I see no
>>> back pressure on our operators. It essentially takes a SP and then the SInk
>>> ( StreamingFileSink to S3 ) just stays in the RUNNING state.
>>>
>>> Without drain i stop + savepoint works fine.  I would imagine drain is
>>> important ( flush the buffers etc  ) but why this hang ( I did it 3 times
>>> and waited 15 minutes each time ).
>>>
>>> Regards.
>>>
>>


Re: StateFun examples in scala

2021-03-30 Thread Till Rohrmann
Hi Jose,

I am pulling in Gordon who will be able to help you with your question.

Personally, I am not aware of any limitations which prohibit the usage of
Scala.

Cheers,
Till

On Tue, Mar 30, 2021 at 11:55 AM jose farfan  wrote:

> Hi
>
> I am trying to find some examples written in scala of StateFun.
>
> But, I cannot find nothing.
> My questions is:
>
>1. is there any problem to use statefun with Scala
>2. is there any place with examples written in scala.
>
> BR
> Jose
>


Re: Support for sending generic class

2021-03-30 Thread Till Rohrmann
Hi Le,

I am pulling in Gordon who might be able to help you with your question.

Looking at the interface Context, it looks that you cannot easily specify a
TypeHint for the message you want to send. Hence, I guess that you
explicitly need to register these types.

Cheers,
Till

On Tue, Mar 30, 2021 at 8:20 AM Le Xu  wrote:

> Hello!
>
> I'm trying to figure out whether Flink Statefun supports sending object
> with class that has generic parameter types (and potentially nested types).
> For example, I send a message that looks like this:
>
> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
> listOfLongObject, Long));
>
> And obviously I'm getting complaints like this:
>
> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
> TypeInformation from Class alone, because generic parameters are missing.
> Please use TypeInformation.of(TypeHint) instead, or another equivalent
> method in the API that accepts a TypeHint instead of a Class. For example
> for a Tuple2 pass a 'new TypeHint>(){}'.
> at
> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
> at
> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
> at
> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
> at
> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
> at
> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
> at
> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
> at
> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
> at
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
> at
> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
> at
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
> at
> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
> at
> org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
> at
> org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
> at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
> at
> benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
> at
> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>
>
> Is there any API function that statefun support for parameterized class
> like this or does the user function need to handle the serialization
> process -- or is there anyway to quickly modify statefun message interface
> to support this functionality.
>
> Thanks!
>
> Le
>
>
>
>
>
>


Re: SP with Drain and Cancel hangs after take a SP

2021-03-30 Thread Till Rohrmann
Hi Vishal,

The difference between stop-with-savepoint and
stop-with-savepoint-with-drain is that the latter emits a max watermark
before taking the snapshot. The idea is to trigger all pending timers and
flush the content of some buffering operations like windowing.
Semantically, you should use the first option if you want to stop the job
and resume it at a later point in time. Stop-with-savepoint-with-drain
should only be used if you want to terminate your job and don't intend to
resume it because the max watermark destroys the correctness of results
which are generated after the job is resumed.

For the concrete problem at hand it is difficult to say why it does not
stop. It would be helpful if you could provide us with the debug logs of
such a run. I am also pulling Arvid who works on Flink's connector
ecosystem.

Cheers,
Till

On Mon, Mar 29, 2021 at 11:08 PM Vishal Santoshi 
wrote:

> More interested whether a  StreamingFileSink without a drain
> negatively affects it's exactly-once semantics , given that I state on SP
> would have the offsets from kafka + the valid lengths of the part files at
> SP.  To be honest not sure whether the flushed buffers on sink are included
> in the length, or this is not an issue with StreamingFileSink. If it is the
> former then I would assume we should be documented and then have to look
> why this hang happens.
>
> On Mon, Mar 29, 2021 at 4:08 PM Vishal Santoshi 
> wrote:
>
>> Is this a known issue. We do a stop + savepoint with drain. I see no back
>> pressure on our operators. It essentially takes a SP and then the SInk (
>> StreamingFileSink to S3 ) just stays in the RUNNING state.
>>
>> Without drain i stop + savepoint works fine.  I would imagine drain is
>> important ( flush the buffers etc  ) but why this hang ( I did it 3 times
>> and waited 15 minutes each time ).
>>
>> Regards.
>>
>


Re: Failure detection in Flink

2021-03-30 Thread Till Rohrmann
Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of
components. This mechanism is independent of the used deployment model. The
relevant configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka
system. Since we are using TCP Akka's failure detector is not needed [2]. I
think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism
because in some deployment scenarios we have additional signals available
which could help us with the detection. But so far we haven't made a lot of
progress in this area.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#advanced-fault-tolerance-options
[2]
https://doc.akka.io/docs/akka-enhancements/current/config-checker.html#transport-failure-detector
[3] https://issues.apache.org/jira/browse/FLINK-22048

Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal  wrote:

> Hello,
>
> I'm looking for some resources around failure detection in Flink between
> the various components such as Task Manager, Job Manager, Resource Manager,
> etc. For example, how does the Job Manager detect that a Task Manager is
> down (long GC pause or it just crashed)?
>
> There is some indication of the use of heartbeats, is this via Akka death
> watches or custom heartbeat implementation? Reason I ask is because some
> configurations for timeout are AKKA related, whereas others aren't. I would
> like to understand which timeouts are relevant to which pieces.
>
> e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
> I see some earlier posts that mention akka.watch.heartbeat.interval,
> though this is not present on the latest configuration page for Flink.
>
> Also, is this failure detection mechanism the same irrespective of the
> deployment model, i.e. Kubernetes/Yarn/Mesos?
>
> Thanks,
> Sonam
>
>
>


Re: Flink State Query Server threads stuck in infinite loop with high GC activity on CopyOnWriteStateMap get

2021-03-30 Thread Till Rohrmann
Hi Aashutosh,

The queryable state feature is no longer actively maintained by the
community. What I would recommend is to output the aggregate counts via a
sink to some key value store which you query to obtain the results.

Looking at the implementation of CopyOnWriteStateMap, it does not look like
that this class is supposed to be accessed concurrently. I suspect that
this is the cause for the infinite loop you are seeing. I think the problem
was that this class was implemented after the development of queryable
state had been stopped. Sorry for the inconveniences.

I also pulled in the author of the CopyOnWriteStateMap PengFei Li who might
give more details.

Cheers,
Till

On Mon, Mar 29, 2021 at 2:59 PM Aashutosh Swarnakar 
wrote:

> Hi Folks,
>
>
>
> I've recently started using Flink for a pilot project where I need to
> aggregate event counts on per minute window basis. The state has been made
> queryable so that external services can query the state via Flink State
> Query API. I am using memory state backend with a keyed process function
> and map state.
>
>
>
> I've a simple job running on a 6 node flink standalone cluster. 1 job
> manager and 5 task managers. External services can query the 5 task manager
> nodes for flink state.
>
>
>
> The job operates fine whenever external clients are not querying flink
> state but once the external clients start quering the flink state via flink
> queryable client, I observe that flink query server threads and the
> aggregate task thread gets stuck into an infinite loop in
> CopyOnWriteStateMap.get() method. Also the GC activity peaks to 100% along
> with 100% CPU usage. The task manager nodes are unable to recover from this
> situation and I have to restart the cluster. Let me know if anybody has
> faced this issue before.
>
>
>
> Any information with regards to below queries will be very helpful.
>
>
>
> 1. Is this a thread synchronisation issue ?
>
> 2. Is CopyOnWriteStateMap class thread safe ?
>
> 3. Is there a possibility for any race conditions when incremental
> rehashing is done for CopyOnWriteStateMap ?
>
> 4. Can this be an issue with state usage in my job implementation (I am
> doing a get and put on map state for processing each element in the stream)
> ?
>
>
>
>
>
> I have added the thread dump below along with the code snippet where the
> threads go into infinite loop.
>
>
>
> Task thread:
>
>
>
> "aggregates-stream -> Map -> Sink: Cassandra Sink
> (2/10)#0" - Thread t@76
>
>java.lang.Thread.State: RUNNABLE
>
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)
>
> at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)
>
> at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:136)
>
> at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:86)
>
> at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>
> at
> com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:44)
>
> at
> com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:20)
>
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$202/2001022910.runDefaultAction(Unknown
> Source)
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
> at java.lang.Thread.run(Thread.java:748)

flinkSQL + pythonUDF问题

2021-03-30 Thread guaishushu1103
任务运行一段时间出现Apache beam问题 有哪位大佬能帮忙看看:
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 3134: Traceback (most recent call last): File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
 line 421, in input_elements element = received.get(timeout=1) File 
"/home/yarn/software/python/lib/python3.6/queue.py", line 172, in get raise 
Empty queue.Empty During handling of the above exception, another exception 
occurred: Traceback (most recent call last): File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute response = task() File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in  lambda: self.create_worker().do_instruction(request), 
request) File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction getattr(request, request_type), 
request.instruction_id) File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) 
File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 967, in process_bundle expected_inputs): File 
"/home/yarn/software/python/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
 line 424, in input_elements raise RuntimeError('Channel closed prematurely.') 
RuntimeError: Channel closed prematurely.
guaishushu1...@163.com

StateFun examples in scala

2021-03-30 Thread jose farfan
Hi

I am trying to find some examples written in scala of StateFun.

But, I cannot find nothing.
My questions is:

   1. is there any problem to use statefun with Scala
   2. is there any place with examples written in scala.

BR
Jose


Flink 写ORC失败

2021-03-30 Thread Jacob
使用Flink API消费kafka消息,写orc文件,报错如下
Caused by: org.apache.flink.util.SerializedThrowable
at java.lang.System.arraycopy(Native Method) ~[?:1.8.0_191-ojdkbuild]
at org.apache.hadoop.io.Text.set(Text.java:225) ~[test456.jar:?]
at 
org.apache.orc.impl.StringRedBlackTree.add(StringRedBlackTree.java:59)
~[test456.jar:?]
at
org.apache.orc.impl.writer.StringTreeWriter.writeBatch(StringTreeWriter.java:70)
~[test456.jar:?]
at
org.apache.orc.impl.writer.MapTreeWriter.writeBatch(MapTreeWriter.java:104)
~[test456.jar:?]
at
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)
~[test456.jar:?]
at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)
~[test456.jar:?]
at 
org.apache.flink.orc.writer.OrcBulkWriter.flush(OrcBulkWriter.java:66)
~[test456.jar:?]
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:59)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:226)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:259)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:240)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:245)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:236)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:86)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:415)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
... 13 more


大概原因应该是写一个map类型的数据时候出错,但不知道具体是哪个地方的错误

看到一个相似的错误
https://stackoverflow.com/questions/55246512/error-writing-to-orcnewoutputformat-using-mapr-multipleoutputs

不太清楚这个错误时什么原因所致
已知数据不为空,不为null



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


pyflink1.12 报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0

2021-03-30 Thread xiaoyue
在执行 pyflink UDAF 
脚本时报错:org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0。 
目前udaf计算的结果,无法sink, 不知路过的大佬,是否也遇到过这个问题?
异常信息如下:
Traceback (most recent call last):
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 114, in 
csv_source_udaf(csv_source)
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 45, in wrapper
func(*args, **kw)
  File "C:/projects/virtual_pyflink1.12/TestScript/local_udaf_logReturn.py", 
line 103, in csv_source_udaf
print(result.to_pandas())
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table.py", 
line 808, in to_pandas
if batches.hasNext():
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
return f(*a, **kw)
  File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o101.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
at 
org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
at 
org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
... 16 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
... 18 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 

Re: Hadoop is not in the classpath/dependencies

2021-03-30 Thread Chesnay Schepler
This looks related to HDFS-12920; where Hadoop 2.X tries to read a 
duration from hdfs-default.xml expecting plain numbers, but in 3.x they 
also contain time units.


On 3/30/2021 9:37 AM, Matthias Seiler wrote:


Thank you all for the replies!


I did as @Maminspapin suggested and indeed the previous error 
disappeared, but now the exception is

```
java.io.IOException: Cannot instantiate file system for URI: 
hdfs://node-1:9000/flink

//...
Caused by: java.lang.NumberFormatException: For input string: "30s"
// this is thrown by the flink-shaded-hadoop library
```
I thought that it relates to the windowing I do, which has a slide 
interval of 30 seconds, but removing it displays the same error.


I also added the dependency to the maven pom, but without effect.

Since I use Hadoop 3.2.1, I also tried 
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber 
but with this I can't even start a cluster (`TaskManager 
initialization failed`).




@Robert, Flink includes roughly 100 hdfs jars. 
`hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to 
contain `DistributedFileSystem.class`, which I checked running `jar 
tvf hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep 
DistributedFileSystem`. How can I verify that the class is really 
accessible?


Cheers,
Matthias

On 3/26/21 10:20 AM, Robert Metzger wrote:

Hey Matthias,

Maybe the classpath contains hadoop libraries, but not the HDFS 
libraries? The "DistributedFileSystem" class needs to be accessible 
to the classloader. Can you check if that class is available?


Best,
Robert

On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler 
> wrote:


Hello everybody,

I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two
machines.
The job should store the checkpoints on HDFS like so:
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
```

Unfortunately, the JobManager throws
```
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not
find a file system implementation for scheme 'hdfs'. The scheme
is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. For a full list of supported file systems,
please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
.
// ...
Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is
not in the classpath/dependencies.
```
and I don't understand why.

`echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
wildcards. Flink's JobManger prints the classpath which includes
specific packages from these Hadoop libraries. Besides that, Flink
creates the state directories on HDFS, but no content.

Thank you for any advice,
Matthias





Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-30 Thread Yingjie Cao
Hi Haihang,

After scanning the user mailing list, I found some users have reported
checkpoint timeout when using unaligned checkpoint, can you share which
checkpoint mode do you use? (The information can be found in log or the
checkpoint -> configuration tab in webui)

Best,
Yingjie

Yingjie Cao  于2021年3月30日周二 下午4:29写道:

> Hi Haihang,
>
> I think your issue is not related to FLINK-16404
> , because that change
> should have small impact on checkpoint time, we already have a micro
> benchmark for that change (1s checkpoint interval) and no regression is
> seen.
>
> Could you share some more information, for example, the stack of the task
> which can not finish the checkpoint?
>
> Best,
> Yingjie
>
> Haihang Jing  于2021年3月25日周四 上午10:58写道:
>
>> Hi,Congxian ,thanks for your replay.
>> job run on Flink1.9 (checkpoint interval 3min)
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/6.png>
>>
>> job run on Flink1.12 (checkpoint interval 10min)
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/7.png>
>>
>> job run on Flink1.12 (checkpoint interval 3min)
>> Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/2.png>
>>
>> Pic2:Start delay(4m27s):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/1.png>
>>
>> Pic3:Next checkpoint failed(task141 ack n/a):
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/3.png>
>>
>> Pic4:Did not see back pressure and data skew:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/4.png>
>>
>> Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/5.png>
>>
>> Best,
>> Haihang
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: DataStream from kafka topic

2021-03-30 Thread Maminspapin
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema {

private String registryUrl;
private transient KafkaAvroDeserializer deserializer;

public GenericRecordSchema(String registryUrl) {
this.registryUrl = registryUrl;
}

@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}

@Override
public GenericRecord deserialize(ConsumerRecord
consumerRecord) throws Exception {
checkInitialized();
return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

private void checkInitialized() {
if (deserializer == null) {
Map props = new HashMap<>();
   
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
   
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
deserializer = new KafkaAvroDeserializer(client, props);
}
}
}

2. Consumer

private static FlinkKafkaConsumer getConsumer(String topic) {

return new FlinkKafkaConsumer<>(
topic,
new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at

Re: 相同的作业配置 ,Flink1.12 版本的作业checkpoint耗时增加以及制作失败,Flink1.9的作业运行正常

2021-03-30 Thread Yingjie Cao
这个应该不是FLINK-16404
的影响,那个对checkpoint时间的影响比较小,是已经有一个benchmark测试的,1s的checkpoint
interval也没什么大问题,我建议可以看一下失败的task的stack,看一下在干什么,可能排查问题更快一些。

Haihang Jing  于2021年3月24日周三 下午12:06写道:

> 【现象】相同配置的作业(checkpoint interval :3分钟,作业逻辑:regular
> join),flink1.9运行正常,flink1.12运行一段时间后,checkpoint制作耗时增大,最后checkpoint制作失败。
>
>
> 【分析】了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量,因此调整checkpoint
> interval为10分钟进行对比测试,发现调整后(interval为10),flink1.12上运行的作业运行正常。
> 相关issue:https://issues.apache.org/jira/browse/FLINK-16404
>
> 【问题】1.想咨询下大家有遇到过相同的情况么?
> 2.flink1.12的作业checkpoint间隔对作业的影响具体有多大?官方有测试么?
>
> checkpoint interval为3分钟的flink1.12作业运行5小时后,checkpoint制作失败,具体异常栈:
>
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
>
> at
>
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>


Re: With the checkpoint interval of the same size, the Flink 1.12 version of the job checkpoint time-consuming increase and production failure, the Flink1.9 job is running normally

2021-03-30 Thread Yingjie Cao
Hi Haihang,

I think your issue is not related to FLINK-16404
, because that change
should have small impact on checkpoint time, we already have a micro
benchmark for that change (1s checkpoint interval) and no regression is
seen.

Could you share some more information, for example, the stack of the task
which can not finish the checkpoint?

Best,
Yingjie

Haihang Jing  于2021年3月25日周四 上午10:58写道:

> Hi,Congxian ,thanks for your replay.
> job run on Flink1.9 (checkpoint interval 3min)
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/6.png>
>
> job run on Flink1.12 (checkpoint interval 10min)
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/7.png>
>
> job run on Flink1.12 (checkpoint interval 3min)
> Pic1:Time used to complete the checkpoint in 1.12 is longer(5m32s):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/2.png>
>
> Pic2:Start delay(4m27s):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/1.png>
>
> Pic3:Next checkpoint failed(task141 ack n/a):
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/3.png>
>
> Pic4:Did not see back pressure and data skew:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/4.png>
>
> Pic5:Subtask deal same data nums ,checkpoint endToEnd fast:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3050/5.png>
>
> Best,
> Haihang
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: flink sql count distonct 优化

2021-03-30 Thread Robin Zhang
Hi,guomuhua
   `The number of inputs accumulated by local aggregation every time is
based on mini-batch interval. It means local-global aggregation depends on
mini-batch optimization is enabled `
,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
 看你提问时只是开启了本地聚合一个参数,不知道是不是没写全。
Best,
Robin




guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>  set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> 
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
> 
> 还是flink会帮我自动改写SQL,我不用关心?
> 
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-30 Thread Robin Zhang
Hi,Jark
   我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg`
,这个怎么理解

Best,
Robin



Jark wrote
>> 如果不是window agg,开启参数后flink会自动打散是吧
> 是的
> 
>> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
> 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
> 
> Best,
> Jark
> 
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> 
> On Fri, 26 Mar 2021 at 11:00, guomuhua <

> 663021157@

>> wrote:
> 
>> Jark wrote
>> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
>> > agg支持这个参数了。可以期待下。
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
>>
>> > vincent2015qdlg@
>>
>> > 
>> > wrote:
>> >
>> >> Hi,guomuhua
>> >>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>> >>
>> >> Best,
>> >> Robin
>> >>
>> >>
>> >> guomuhua wrote
>> >> > 在SQL中,如果开启了 local-global 参数:set
>> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> >> > 或者开启了Partial-Final 参数:set
>> >> table.optimizer.distinct-agg.split.enabled=true;
>> >> >  set
>> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> >> > 还需要对应的将SQL改写为两段式吗?
>> >> > 例如:
>> >> > 原SQL:
>> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >> >
>> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> >> > SELECT day, SUM(cnt) total
>> >> > FROM (
>> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> >> > GROUP BY day
>> >> >
>> >> > 还是flink会帮我自动改写SQL,我不用关心?
>> >> >
>> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> >> > 
>> >>
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png
>> ;
>> >>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
>> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 从mysql读取数据异常

2021-03-30 Thread 张锴
报错 信息明确说了只支持insert

air23  于2021年3月30日周二 上午10:32写道:

> 你好 参考官网
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
> 这边读取mysql jdbc数据报错Exception in thread "main"
> org.apache.flink.table.api.TableException: Only insert statement is
> supported now.
>
>
> String  a = "-- register a MySQL table 'users' in Flink SQL\n" +
> "CREATE TABLE MyUserTable (\n" +
> "  id BIGINT\n" +
> ") WITH (\n" +
> "   'connector' = 'jdbc',\n" +
> "   'url' = 'jdbc:mysql://***:3306/monitor',\n" +
> "   'table-name' = 't1',\n" +
> "   'username' = 'root',\n" +
> "   'password' = '***'\n" +
> ") ";
>
> String b ="-- scan data from the JDBC table\n" +
> "SELECT id FROM MyUserTable\n";
>
> tEnv.executeSql(a);
>
>
>
> 请问是不可以从mysql读取数据吗?
>
>
>
>
>


Re: Evenly distribute task slots across task-manager

2021-03-30 Thread yidan zhao
I think currently flink doesn't support your case, and another idea is that
you can set the parallelism of all operators to 64, then it will be evenly
distributed to the two taskmanagers.

Vignesh Ramesh  于2021年3月25日周四 上午1:05写道:

> Hi Matthias,
>
> Thanks for your reply. In my case, yes the upstream operator for the
> operator which is not distributed evenly among task managers is a flink
> Kafka connector with a rebalance(shuffling).
>
> Regards,
> Vignesh
>
> On Tue, 23 Mar, 2021, 6:48 pm Matthias Pohl, 
> wrote:
>
>> There was a similar discussion recently in this mailing list about
>> distributing the work onto different TaskManagers. One finding Xintong
>> shared there [1] was that the parameter cluster.evenly-spread-out-slots is
>> used to evenly allocate slots among TaskManagers but not how the tasks are
>> actually distributed among the allocated slots. It would be interesting to
>> know more about your job. If the upstream operator does some shuffling, you
>> might run into the issue of the task executions not being distributed
>> evenly anymore.
>>
>> Matthias
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evenly-Spreading-Out-Source-Tasks-tp42108p42235.html
>>
>> On Tue, Mar 23, 2021 at 1:42 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Vignesh,
>>> are you trying to achieve an even distribution of tasks for this one
>>> operator that has the parallelism set to 16? Or do you observe the
>>> described behavior also on a job level?
>>> I'm adding Chesnay to the thread as he might have more insights on this
>>> topic.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Mon, Mar 22, 2021 at 6:31 PM Vignesh Ramesh 
>>> wrote:
>>>
 Hello Everyone,

 Can someone help me with a solution?

 I have a flink job(2 task-managers) with a job parallelism of 64 and
 task slot of 64.
 I have a parallelism set for one of the operators as 16. This
 operator(16 parallelism) slots are not getting evenly distributed across
 two task managers. It often takes higher task slots like 10/11 in one task
 manager and 5/6 in other task manager.

 I'am using flink version 1.11.2. I tried adding 
 cluster.evenly-spread-out-slots:
 true but it didn't work. Any solution is greatly appreciated.

 Thanks in advance,

 Regards,
 Vignesh




(无主题)

2021-03-30 Thread 高耀军
退订


| |
高耀军
|
|
邮箱:18221112...@163.com
|

签名由 网易邮箱大师 定制

Re: Hadoop is not in the classpath/dependencies

2021-03-30 Thread Matthias Seiler
Thank you all for the replies!


I did as @Maminspapin suggested and indeed the previous error
disappeared, but now the exception is
```
java.io.IOException: Cannot instantiate file system for URI:
hdfs://node-1:9000/flink
//...
Caused by: java.lang.NumberFormatException: For input string: "30s"
// this is thrown by the flink-shaded-hadoop library
```
I thought that it relates to the windowing I do, which has a slide
interval of 30 seconds, but removing it displays the same error.

I also added the dependency to the maven pom, but without effect.

Since I use Hadoop 3.2.1, I also tried
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber
but with this I can't even start a cluster (`TaskManager initialization
failed`).



@Robert, Flink includes roughly 100 hdfs jars.
`hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to contain
`DistributedFileSystem.class`, which I checked running `jar tvf
hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep
DistributedFileSystem`. How can I verify that the class is really
accessible?

Cheers,
Matthias

On 3/26/21 10:20 AM, Robert Metzger wrote:
> Hey Matthias,
>
> Maybe the classpath contains hadoop libraries, but not the HDFS
> libraries? The "DistributedFileSystem" class needs to be accessible to
> the classloader. Can you check if that class is available?
>
> Best,
> Robert
>
> On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler
>  > wrote:
>
> Hello everybody,
>
> I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two
> machines.
> The job should store the checkpoints on HDFS like so:
> ```java
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
> ```
>
> Unfortunately, the JobManager throws
> ```
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
> .
> // ...
> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is
> not in the classpath/dependencies.
> ```
> and I don't understand why.
>
> `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
> wildcards. Flink's JobManger prints the classpath which includes
> specific packages from these Hadoop libraries. Besides that, Flink
> creates the state directories on HDFS, but no content.
>
> Thank you for any advice,
> Matthias
>


退订

2021-03-30 Thread 徐永健
退订

Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-30 Thread Dawid Wysakowicz
Hey,

I am not sure which format you use, but if you work with JSON maybe this
option[1] could help you.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard

On 30/03/2021 06:45, Sumeet Malhotra wrote:
> Thanks. Yes, that's a possibility. I'd still prefer something that can
> be done within the Table API. If it's not possible, then there's no
> other option but to use the DataStream API to read from Kafka, do the
> time conversion and create a table from it.
>
> ..Sumeet
>
> On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski  > wrote:
>
> Hi,
>
> I hope someone else might have a better answer, but one thing that
> would most likely work is to convert this field and define even
> time during DataStream to table conversion [1]. You could always
> pre process this field in the DataStream API.
>
> Piotrek
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
> 
> 
>
> pon., 29 mar 2021 o 18:07 Sumeet Malhotra
> mailto:sumeet.malho...@gmail.com>>
> napisał(a):
>
> Hi,
>
> Might be a simple, stupid question, but I'm not able to find
> how to convert/interpret a UTC datetime string
> like *2021-03-23T07:37:00.613910Z* as event-time using a
> DDL/Table API. I'm ingesting data from Kafka and can read this
> field as a string, but would like to mark it as event-time by
> defining a watermark.
>
> I'm able to achieve this using the DataStream API, by defining
> my own TimestampAssigner that converts the datetime string to
> milliseconds since epoch. How can I do this using a SQL DDL or
> Table API?
>
> I tried to directly interpret the string as TIMESTAMP(3) but
> it fails with the following exception:
>
> java.time.format.DateTimeParseException: Text
> '2021-03-23T07:37:00.613910Z' could not be parsed...
>
> Any pointers?
>
> Thanks!
> Sumeet
>


OpenPGP_signature
Description: OpenPGP digital signature


Support for sending generic class

2021-03-30 Thread Le Xu
Hello!

I'm trying to figure out whether Flink Statefun supports sending object
with class that has generic parameter types (and potentially nested types).
For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
TypeInformation from Class alone, because generic parameters are missing.
Please use TypeInformation.of(TypeHint) instead, or another equivalent
method in the API that accepts a TypeHint instead of a Class. For example
for a Tuple2 pass a 'new TypeHint>(){}'.
at
org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at
org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at
org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at
org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at
org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at
org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at
org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at
org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at
benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class
like this or does the user function need to handle the serialization
process -- or is there anyway to quickly modify statefun message interface
to support this functionality.

Thanks!

Le