Flink to get historical data from kafka between timespan t1 & t2

2021-01-07 Thread VINAY.RAICHUR
Hi Flink Community Team,

This is a desperate request for your help on below.

I am new to the Flink and trying to use it with Kafka for Event-based data 
stream processing in my project. I am struggling using Flink to find solutions 
to my requirements of project below:


  1.  Get all Kafka topic records at a given time point 't' (now or in the 
past). Also how to pull latest-record only* from Kafka using Flink
  2.  Getting all records from Kafka for a given time interval in the past 
between t1 & t2 time period.
  3.  Continuously getting data from Kafka starting at a given time point (now 
or in the past). The client will actively cancel/close the data streaming. 
Examples: live dashboards. How to do it using Flink?
Please provide me sample "Flink code snippet" for pulling data from kafka for 
above three requirements and oblige. I am stuck for last one month without much 
progress and your timely help will be a savior for me!
Thanks & Regards,
Vinay Raichur
T-Systems India | Digital Solutions
Mail: vinay.raic...@t-systems.com
Mobile: +91 9739488992



Re: flink的算子没有类似于spark的cache操作吗?

2021-01-07 Thread 张锴
保存中间变量可以用状态存

李继  于2021年1月7日周四 下午5:42写道:

> HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作
>
> val env = getBatchEnv
> val ds = env.fromElements("a","b","c")
>
> val ds2 = ds.map(x=>{
>   println("map op")
>   x.charAt(0).toInt+1
> })
>
> //此操作会打印三遍map op
> ds2.print()
>
> //此操作又会打印三遍map op
> ds2.filter(_>100).print()
>


Re: Re:sql-client配置hive启动不了

2021-01-07 Thread amenhub
好的,谢谢



 
发件人: Rui Li
发送时间: 2021-01-08 11:42
收件人: user-zh
主题: Re: 回复:sql-client配置hive启动不了
Hi,
 
用table api的话可以设置flink的security参数来指定principal和keytab [1]。
SQL client的模式试一下启动前手动做kinit行不行吧
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems
 
On Fri, Jan 8, 2021 at 10:06 AM amenhub  wrote:
 
> 啊?确实是带kerberos的hms,那请问有其他的解决办法吗
>
>
>
>
> 发件人: 叶贤勋
> 发送时间: 2021-01-08 10:03
> 收件人: user-zh@flink.apache.org
> 主题: 回复:sql-client配置hive启动不了
> HMS是不是带kerberos认证的?
> 目前社区hive connector不支持访问Kerberos的HMS
>
>
>
>
> 在2021年01月7日 18:39,amenhub 写道:
> 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql
> client
>
> 报错信息:https://imgchr.com/i/smQrlj
>
> flink 版本,1.12
> hive 版本,3.1.0
>
>
>
>
 
-- 
Best regards!
Rui Li


How should I process a cumulative counter?

2021-01-07 Thread Larry Aspen
Hi,

I'm evaluating Flink for our company's IoT use case and read a blog post
by Fabian Hueske from 2015 [1]. We have a similar situation except the
sensor is sending the value of a cumulative counter instead of a count.
We would like to calculate the sum of deltas of consecutive cumulative
counter values that occur during a time window.

Here is a scenario of a cumulative counter measuring runtime in seconds
and a machine starting for the first time at 12:00:00 and running for
the whole hour (sensor records values when it starts, every 15 minutes
and on hour change):

timestamp, cumulative counter value in seconds
12:00:00, 0
12:15:00, 900
12:30:00, 1800
12:45:00, 2700
13:00:00, 3600

This would produce the following deltas:
12:00:00, 900 -0 = 900
12:15:00, 1800 - 900 = 900
12:30:00, 2700 - 1800 = 900
12:45:00, 3600 - 2700 = 900

We would then sum the deltas to get runtime in seconds for the hour:
900 + 900 + 900 + 900 = 3600

What would be a good way to handle this kind of calculation in Flink?

I have already tried using a tumbling event time window of one hour,
but then the last value is only part of the next window and the delta
of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.

I have also tried a sliding event time window of two hours where the sum
is calculated for the first hour. This produces the correct sum in this
scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
14:00:00).

My latest attempt has been to use a global window where I try to keep
the values for the last two hours and calculate the sum for the older
hour. This seems to work in my experiments where I read values from
a file and use parallelism of one. If I increase the parallelism, the
values are processed out of order and the results are incorrect as
older values are received after newer values which causes them to be
evicted.

Any advice on this would be appreciated.

Best regards,
Larry Aspen

[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html


Re: 在where条件中使用汉字导致查询出的字段出现unicode编码

2021-01-07 Thread spike
遇到了同样的问题,但是看没issue跟进



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.12.0 native k8s启动不了

2021-01-07 Thread yzxs
1、使用以下命令发布任务:
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
   
-Dkubernetes.container.image=registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1
\
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.container-start-command-template="%java% %classpath%
%jvmmem% %jvmopts% %logging% %class% %args%" \
local:///opt/flink/usrlib/WordCount.jar

2、任务发布后,pod重启失败,用kubectl logs查看日志,出现以下错误:
/docker-entrypoint.sh: 125: exec: native-k8s: not found

3、检查了镜像的docker-entrypoint.sh脚本,没有navive-k8s的命令,镜像是基于flink最新的镜像进行构筑的,dockerfile如下:
FROM flink:latest
RUN mkdir -p /opt/flink/usrlib
COPY ./WordCount.jar /opt/flink/usrlib/WordCount.jar

3、pod的describe信息
Name: my-first-application-cluster-59c4445df4-4ss2m
Namespace:default
Priority: 0
Node: minikube/192.168.64.2
Start Time:   Wed, 23 Dec 2020 17:06:02 +0800
Labels:   app=my-first-application-cluster
  component=jobmanager
  pod-template-hash=59c4445df4
  type=flink-native-kubernetes
Annotations:  
Status:   Running
IP:   172.17.0.3
IPs:
  IP:   172.17.0.3
Controlled By:  ReplicaSet/my-first-application-cluster-59c4445df4
Containers:
  flink-job-manager:
Container ID: 
docker://b8e5759488af5fd3e3273f69d42890d9750d430cbd6e18b1d024ab83293d0124
Image: registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1
Image ID: 
docker-pullable://registry.cn-shenzhen.aliyuncs.com/syni_test/flink@sha256:53a2cec0d0a532aa5d79c241acfdd13accb9df78eb951eb4e878485174186aa8
Ports: 8081/TCP, 6123/TCP, 6124/TCP
Host Ports:0/TCP, 0/TCP, 0/TCP
Command:
  /docker-entrypoint.sh
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
-Xms1073741824 -XX:MaxMetaspaceSize=268435456
-Dlog.file=/opt/flink/log/jobmanager.log
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
-D jobmanager.memory.off-heap.size=134217728b -D
jobmanager.memory.jvm-overhead.min=201326592b -D
jobmanager.memory.jvm-metaspace.size=268435456b -D
jobmanager.memory.heap.size=1073741824b -D
jobmanager.memory.jvm-overhead.max=201326592b
State:  Waiting
  Reason:   CrashLoopBackOff
Last State: Terminated
  Reason:   Error
  Exit Code:127
  Started:  Wed, 23 Dec 2020 17:37:28 +0800
  Finished: Wed, 23 Dec 2020 17:37:28 +0800
Ready:  False
Restart Count:  11
Limits:
  cpu: 1
  memory:  1600Mi
Requests:
  cpu: 1
  memory:  1600Mi
Environment:
  _POD_IP_ADDRESS:   (v1:status.podIP)
Mounts:
  /opt/flink/conf from flink-config-volume (rw)
  /var/run/secrets/kubernetes.io/serviceaccount from default-token-9hdqt
(ro)
Conditions:
  Type  Status
  Initialized   True 
  Ready False 
  ContainersReady   False 
  PodScheduled  True 
Volumes:
  flink-config-volume:
Type:  ConfigMap (a volume populated by a ConfigMap)
Name:  flink-config-my-first-application-cluster
Optional:  false
  default-token-9hdqt:
Type:Secret (a volume populated by a Secret)
SecretName:  default-token-9hdqt
Optional:false
QoS Class:   Guaranteed
Node-Selectors:  
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type Reason Age  From   Message
   --     ---
  Normal   Scheduled  15d  default-scheduler  Successfully
assigned default/my-first-application-cluster-59c4445df4-4ss2m to minikube
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
513.7913ms
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
374.1125ms
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
360.6719ms
  Normal   Created15d (x4 over 15d)kubeletCreated
container flink-job-manager
  Normal   Started15d (x4 over 15d)kubeletStarted
container flink-job-manager
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
374.2637ms
  Normal   Pulling15d (x5 over 15d)kubeletPulling image
"registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1"
  Warning  BackOff15d (x141 

Re: 回复:sql-client配置hive启动不了

2021-01-07 Thread Rui Li
Hi,

用table api的话可以设置flink的security参数来指定principal和keytab [1]。
SQL client的模式试一下启动前手动做kinit行不行吧

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems

On Fri, Jan 8, 2021 at 10:06 AM amenhub  wrote:

> 啊?确实是带kerberos的hms,那请问有其他的解决办法吗
>
>
>
>
> 发件人: 叶贤勋
> 发送时间: 2021-01-08 10:03
> 收件人: user-zh@flink.apache.org
> 主题: 回复:sql-client配置hive启动不了
> HMS是不是带kerberos认证的?
> 目前社区hive connector不支持访问Kerberos的HMS
>
>
>
>
> 在2021年01月7日 18:39,amenhub 写道:
> 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql
> client
>
> 报错信息:https://imgchr.com/i/smQrlj
>
> flink 版本,1.12
> hive 版本,3.1.0
>
>
>
>

-- 
Best regards!
Rui Li


Re: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container

2021-01-07 Thread Yun Tang
Hi,

有可能是堆外内存超用,可以参考最近中文社区的一篇投稿 《详解 Flink 容器化环境下的 OOM Killed》进行修改,建议先增大 jvm-overhead 
相关配置

[1] 
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247490197=1=b0893a9bf12fbcae76852a156302de95

祝好
唐云

From: Yang Peng 
Sent: Thursday, January 7, 2021 12:24
To: user-zh 
Subject: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. 
Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB 
virtual memory used. Killing container

Hi,

 
大家好,咨询一个问题,我们有个实时任务运行在Flink1.11.2版本,使用rocksdbstatebackend,最近报警出现了物理内存超限被kill的异常信息,我们查看了监控taskmanager
heap使用量没有超限,direct内存使用量也维持在一个平稳的范围内没有超限,也没有报oom,这种情况是非堆内存异常是吗?完整报错信息如下:

Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792
6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC
-Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=1073741824b -D
taskmanager.memory.network.min=1073741824b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=12750684160b -D
taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=11408506880b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address=flink-cm8.jd.163.org -Dweb.port=0
-Dweb.tmpdir=/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c
-Djobmanager.rpc.port=33656 -Drest.address=flink-cm8.jd.163.org
-Dsecurity.kerberos.login.keytab=/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab
|- 180362 180360 180362 180362 (bash) 0 2 116011008 353 /bin/bash -c
/usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC -Xmx11542724608
-Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=1073741824b -D
taskmanager.memory.network.min=1073741824b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=12750684160b -D
taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=11408506880b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address='flink-cm8.jd.163.org' -Dweb.port='0'
-Dweb.tmpdir='/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c'
-Djobmanager.rpc.port='33656' -Drest.address='flink-cm8.jd.163.org'
-Dsecurity.kerberos.login.keytab='/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab'
1> 
/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.out
2> 
/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.err

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

2021-01-07 11:51:00,781 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
Source: 银河SDK原始日志 (18/90) (51ac2f29df472d001ce9b4307636ac1c) switched
from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@1aad00fa.
java.lang.Exception: Container
[pid=180362,containerID=container_e06_1603181034156_0137_01_02] is
running beyond physical memory limits. Current usage: 25.0 GB of 25 GB
physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing
container.
Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792
6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC
-Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D

Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 Thread Yun Tang
因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。

另外,不建议使用YARN的application状态来判断Flink作业状态,因为如果Flink作业配置了重试策略,即使作业不断进行failover,整个YARN的application状态仍然是RUNNING,并不能发现问题。

祝好
唐云

From: bradyMk 
Sent: Thursday, January 7, 2021 16:38
To: user-zh@flink.apache.org 
Subject: Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

好的,我研究一下,谢谢指导~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink taskmanager id

2021-01-07 Thread Deshpande, Omkar
Hello,

I use flink on kubernetes. And the taskmanagers get assigned random uuids. Is 
there a way to explicitly configure them to use hostnames instead?


Omkar


Re: SQL作业的提交方式

2021-01-07 Thread 林影
zeppelin 0.9 已经可以充当flink的job server角色了?
上次看jeff zhang在云栖大会说的是后面有这个规划,现在已经可以是用来?

Peihui He  于2021年1月8日周五 上午9:21写道:

> 可以尝试下zeppelin 0.9
> http://zeppelin.apache.org/
>
>
> jiangjiguang719  于2021年1月7日周四 下午8:34写道:
>
> > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> > 1、有没有更好的SQL作业的提交方式?
> > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
>


Fw:flink on k8s 提交job时如何指定taskmanager的个数

2021-01-07 Thread 旧城以西












2021-01-0809:47:31,636INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint9of 
job 5e953fb772f9030c728e7c0498555ae2 expired before completing.
2021-01-0809:47:31,637INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 
Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_265]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]




 转发邮件信息 
发件人:"旧城以西" 
发送日期:2021-01-08 10:12:45
收件人:"user-zh@flink.apache.org" 
主题:flink on k8s 提交job时如何指定taskmanager的个数

各位大佬好:
 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
matchLabels:
  app: flink-taskmanager
  template:
metadata:
  labels:
app: flink-taskmanager
spec:
  containers:
  - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
  name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
  value: flink-jobmanager-svc
- name: TZ
  value: "Asia/Shanghai"
resources:
  requests:
cpu: 1200m
memory: 1024Mi
  limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
  mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
  mountPath: /opt/flink/lib
  volumes:
  - name: flink-taskmanager-pv
persistentVolumeClaim:
  claimName: flink-taskmanager-pvc
  - name: flink-jobmanager-lib-pv
persistentVolumeClaim:
  claimName: flink-jobmanager-lib-pvc
  imagePullSecrets:
- name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 

flink on k8s 提交job时如何指定taskmanager的个数

2021-01-07 Thread 旧城以西
各位大佬好:
 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
matchLabels:
  app: flink-taskmanager
  template:
metadata:
  labels:
app: flink-taskmanager
spec:
  containers:
  - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
  name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
  value: flink-jobmanager-svc
- name: TZ
  value: "Asia/Shanghai"
resources:
  requests:
cpu: 1200m
memory: 1024Mi
  limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
  mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
  mountPath: /opt/flink/lib
  volumes:
  - name: flink-taskmanager-pv
persistentVolumeClaim:
  claimName: flink-taskmanager-pvc
  - name: flink-jobmanager-lib-pv
persistentVolumeClaim:
  claimName: flink-jobmanager-lib-pvc
  imagePullSecrets:
- name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
org.apache.flink.api.common.time.Time.minutes(1)));


Re: 回复:sql-client配置hive启动不了

2021-01-07 Thread amenhub
啊?确实是带kerberos的hms,那请问有其他的解决办法吗



 
发件人: 叶贤勋
发送时间: 2021-01-08 10:03
收件人: user-zh@flink.apache.org
主题: 回复:sql-client配置hive启动不了
HMS是不是带kerberos认证的?
目前社区hive connector不支持访问Kerberos的HMS
 
 
 
 
在2021年01月7日 18:39,amenhub 写道:
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql 
client
 
报错信息:https://imgchr.com/i/smQrlj
 
flink 版本,1.12
hive 版本,3.1.0
 
 
 


回复:sql-client配置hive启动不了

2021-01-07 Thread 叶贤勋
HMS是不是带kerberos认证的?
目前社区hive connector不支持访问Kerberos的HMS




在2021年01月7日 18:39,amenhub 写道:
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql 
client

报错信息:https://imgchr.com/i/smQrlj

flink 版本,1.12
hive 版本,3.1.0





Re: SQL作业的提交方式

2021-01-07 Thread LakeShen
我这边是底层其实有个 Flink Jar 任务,然后将 Flink SQL 代码以及作业相关配置参数,当做参数一起传入到底层
的 Flink Jar 中去,当然,现在也有很多其他的方式能够实现,也可以参考楼上的链接。

Best,
LakeShen

Peihui He  于2021年1月8日周五 上午9:21写道:

> 可以尝试下zeppelin 0.9
> http://zeppelin.apache.org/
>
>
> jiangjiguang719  于2021年1月7日周四 下午8:34写道:
>
> > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> > 1、有没有更好的SQL作业的提交方式?
> > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
>


Re: SQL作业的提交方式

2021-01-07 Thread Peihui He
可以尝试下zeppelin 0.9
http://zeppelin.apache.org/


jiangjiguang719  于2021年1月7日周四 下午8:34写道:

> 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> 1、有没有更好的SQL作业的提交方式?
> 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?


Re: Native Kubernetes deployment with GitOps

2021-01-07 Thread Alex Adriaanse
Thanks for the reply!

Just to clarify, when I talked about standalone deployments I was referring to 
standalone Kubernetes deployments. We currently have no interest in running 
Flink outside of K8s. I was mostly just curious about the differences in the 
native integration vs. standalone deployment options on Kubernetes.

As a side note, we're planning on using Flink in Application Mode.

Alex

On Jan 7, 2021, at 4:17 PM, Israel Ekpo 
mailto:israele...@gmail.com>> wrote:

If you are just getting started, running it on Kubernetes could simplify that 
logistics and resources needed for getting started.

It also allows you to possibly reuse infrastructure that you may already be 
using for other projects and purposes.

If you are just getting started and just learning, the setup and teardown time 
is much faster with Kubernetes (in my experience) when compared to setting it 
as Standalone on VMs

Network connectivity to other components within my ecosystem is also easier 
when with K8s.

Other options are available here for your reference

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#overview-and-reference-architecture

If you are just starting out, I would recommend deploying your containers on 
the K8S cluster via YAML manifests first and then convert to Helm charts (once 
the vanilla YAML is stable) so that you can bundle the various components as a 
single deployment.



On Thu, Jan 7, 2021 at 4:57 PM Alex Adriaanse 
mailto:a...@oseberg.io>> wrote:
I'm trying to setup a Flink 1.12 deployment on a Kubernetes cluster using 
custom Docker images (since the official ones aren't out yet). Since the 
documentation states that "We generally recommend new users to deploy Flink on 
Kubernetes using native Kubernetes deployments", I'm trying out the native 
Kubernetes integration.

At the same time, I'm also trying to move to GitOps where we use a tool like 
Argo CD to keep Kubernetes resources in sync with declarations stored in a Git 
repository. From what I can tell there's no way to do GitOps with native 
Kubernetes deployments, since the Kubernetes resources it produces are 
generated dynamically and are managed via CLI rather than being suitable for 
storage in a Git repository. Is that correct?

Would it be better in my case to just use the standalone Kubernetes deployment 
option and have our CD tool deploy the YAML files listed in Flink's 
documentation via Git? Or should I use the native Kubernetes integration to 
create an initial set of resources in our cluster, export those to a Git 
repository, and then use a GitOps workflow to modify those resources going 
forward rather than using the Flink CLI? In other words, I'd only use the Flink 
CLI to bootstrap the Git repository.

Besides ease of setup, are there any other benefits to the native Kubernetes 
integration compared to standalone deployments?

Thanks!

Alex



Re: Native Kubernetes deployment with GitOps

2021-01-07 Thread Israel Ekpo
If you are just getting started, running it on Kubernetes could simplify
that logistics and resources needed for getting started.

It also allows you to possibly reuse infrastructure that you may already be
using for other projects and purposes.

If you are just getting started and just learning, the setup and teardown
time is much faster with Kubernetes (in my experience) when compared to
setting it as Standalone on VMs

Network connectivity to other components within my ecosystem is also easier
when with K8s.

Other options are available here for your reference

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#overview-and-reference-architecture

If you are just starting out, I would recommend deploying your containers
on the K8S cluster via YAML manifests first and then convert to Helm charts
(once the vanilla YAML is stable) so that you can bundle the various
components as a single deployment.



On Thu, Jan 7, 2021 at 4:57 PM Alex Adriaanse  wrote:

> I'm trying to setup a Flink 1.12 deployment on a Kubernetes cluster using
> custom Docker images (since the official ones aren't out yet). Since the
> documentation states that "We generally recommend new users to deploy Flink
> on Kubernetes using native Kubernetes deployments", I'm trying out the
> native Kubernetes integration.
>
> At the same time, I'm also trying to move to GitOps where we use a tool
> like Argo CD to keep Kubernetes resources in sync with declarations stored
> in a Git repository. From what I can tell there's no way to do GitOps with
> native Kubernetes deployments, since the Kubernetes resources it produces
> are generated dynamically and are managed via CLI rather than being
> suitable for storage in a Git repository. Is that correct?
>
> Would it be better in my case to just use the standalone Kubernetes
> deployment option and have our CD tool deploy the YAML files listed in
> Flink's documentation via Git? Or should I use the native Kubernetes
> integration to create an initial set of resources in our cluster, export
> those to a Git repository, and then use a GitOps workflow to modify those
> resources going forward rather than using the Flink CLI? In other words,
> I'd only use the Flink CLI to bootstrap the Git repository.
>
> Besides ease of setup, are there any other benefits to the native
> Kubernetes integration compared to standalone deployments?
>
> Thanks!
>
> Alex


Native Kubernetes deployment with GitOps

2021-01-07 Thread Alex Adriaanse
I'm trying to setup a Flink 1.12 deployment on a Kubernetes cluster using 
custom Docker images (since the official ones aren't out yet). Since the 
documentation states that "We generally recommend new users to deploy Flink on 
Kubernetes using native Kubernetes deployments", I'm trying out the native 
Kubernetes integration.

At the same time, I'm also trying to move to GitOps where we use a tool like 
Argo CD to keep Kubernetes resources in sync with declarations stored in a Git 
repository. From what I can tell there's no way to do GitOps with native 
Kubernetes deployments, since the Kubernetes resources it produces are 
generated dynamically and are managed via CLI rather than being suitable for 
storage in a Git repository. Is that correct?

Would it be better in my case to just use the standalone Kubernetes deployment 
option and have our CD tool deploy the YAML files listed in Flink's 
documentation via Git? Or should I use the native Kubernetes integration to 
create an initial set of resources in our cluster, export those to a Git 
repository, and then use a GitOps workflow to modify those resources going 
forward rather than using the Flink CLI? In other words, I'd only use the Flink 
CLI to bootstrap the Git repository.

Besides ease of setup, are there any other benefits to the native Kubernetes 
integration compared to standalone deployments?

Thanks!

Alex

Re: Using key.fields in 1.12

2021-01-07 Thread Aeden Jameson
 Brilliant, thank you. That will come in handy. I was looking through docs
hoping there was a way to still specify the schema with no luck. Does such
an option exist?

On Thu, Jan 7, 2021 at 2:33 AM Timo Walther  wrote:

> Hi Aeden,
>
> `format.avro-schema` is not required anymore in the new design. The Avro
> schema is derived entirely from the table's schema.
>
> Regards,
> Timo
>
>
>
> On 07.01.21 09:41, Aeden Jameson wrote:
> > Hi Timo,
> >
> > Thanks for responding. You're right. So I did update the properties.
> >>From what I can tell the new design you're referring to uses the
> > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
> > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
> > support those options. Is that right? So I updated my configuration to
> >
> > connector= 'kafka'
> > topic   = 'my-topic'
> > properties.group.id = 'my-consumer-group'
> > properties.bootstrap.servers = '...'
> > format = 'avro'
> > format.avro-schema = ''
> > key.fields = 'my_key_field'
> >
> > However, the property format.avro-schema doesn't appear to be
> > supported by KafkaDynamicTableFactory. I get this exception.
> >
> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> > options found for connector 'kafka'.
> >
> > Unsupported options:
> >
> > format.avro-schema
> >
> > Supported options:
> >
> > connector
> > format
> > key.fields
> > key.fields-prefix
> > key.format
> > properties.bootstrap.servers
> > properties.group.id
> > property-version
> > scan.startup.mode
> > scan.startup.specific-offsets
> > scan.startup.timestamp-millis
> > scan.topic-partition-discovery.interval
> > sink.parallelism
> > sink.partitioner
> > sink.semantic
> > topic
> > topic-pattern
> > value.fields-include
> > value.format
> >  at
> org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
> >  at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
> >  at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
> >  at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
> >  at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> >  ... 21 more
> >
> > FAILURE: Build failed with an exception.
> >
> >
> >
> >
> > The format.avro-schema property was supported it what looks to me the
> > old design in in KafkaTableSourceSinkFactoryBase with this line,
> >
> >  properties.add(FORMAT + ".*");
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
> >
> > Does format.avro-schema need to be specified differently?
> >
> > Thank you,
> > Aeden
> >
> > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther  wrote:
> >>
> >> Hi Aeden,
> >>
> >> we updated the connector property design in 1.11 [1]. The old
> >> translation layer exists for backwards compatibility and is indicated by
> >> `connector.type=kafka`.
> >>
> >> However, `connector = kafka` indicates the new property design and
> >> `key.fields` is only available there. Please check all properties again
> >> when upgrading, they are mentioned here [2].
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
> >>
> >>
> >> On 06.01.21 18:35, Aeden Jameson wrote:
> >>> Yes, I do have that dependency. I see it in the dependency view of
> >>> intellij and directly. in the uber jar. Thanks for responding.
> >>>
> >>> - Aeden
> >>>
> >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski 
> wrote:
> 
>  Hey,
> 
>  have you added Kafka connector as the dependency? [1]
> 
>  [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> 
>  Best,
>  Piotrek
> 
>  śr., 6 sty 2021 o 04:37 Aeden Jameson 
> napisał(a):
> >
> > I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> > feature of the Kafa SQL Connector. My current connector is configured
> > as ,
> >
> > connector.type= 'kafka'
> > connector.version = 'universal'
> > connector.topic   = 'my-topic'
> > connector.properties.group.id = 'my-consumer-group'
> > connector.properties.bootstrap.servers = '...'
> > format.type = 'avro'
> > format.avro-schema = ''
> >
> > I tried adding
> >
> > key.fields = 'my_key_field'
> >
> > as well as
> >
> > key.format = 'avro'
> > key.fields = 'my_key_field'
> >
> > but I get the exception
> >
> > Caused by:
> 

Flink Kafka integration issues

2021-01-07 Thread narasimha
Hi,

Context:
Built a fraud detection kind of app.
Business logic is all fine, but when putting into production, Kafka cluster
is becoming unstable.

The topic to which it wrote have approx 80 events/sec. post running for few
hours Kafka broker indexes are getting corrupted.

Topic config: single partition, replication 3.

Which is making consumer on this broker to fail.

Haven’t find any issues with load testing on non prod environments.

Not sure what could cause an issue.

Kafka version - 2.3
Flink version - 1.11.2- scala-2.12
Using same version Kafka consumer, Kafka producer.

Any suggestions are welcomed, what can be the cause. What are certain are
have to look at.

-- 
Sent from my iPhone


SQL Function with more than one parameter IN

2021-01-07 Thread danp
Hi everyone, 

I just posted to the Apache BEAM mailing list to find out if there is good 
syntax for my usecase. I've been following Flink for he last couple of yours 
so that why I'm emailing more here the same email to you aswell.

I've been reading thru both the Beam and the Flink website about UDF and
UADF
but I cant find an answer to my question, which is if one can combine more
than two parameters into a U-/DF? Im working in finance where for example
VWAP on timeseries data is an important metrics
and for that metric you need both the price and the volume to get an
accurate metric.

I've looked into
https://github.com/GoogleCloudPlatform/dataflow-sample-applications/tree/master/timeseries-streaming
which is a nice library for this kind of logic-/metrics, but I still think
that SQL could be a better fit for 
this kind of financial calculations.

Therefore I wonder if it is possible to create a U-A/DF that takes for
example volume and price as two parameters 
and via a window function create an accumulator that gives back the result?
Can one use a PAIR
in an SQL statement to use a TableAggregate with > 1 parameters in Flink
SQL?

Thanks in advance, 

Regards
Dan 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink UDF registration from jar at runtime

2021-01-07 Thread Jakub N
Hi Dawid,

The approach you sent indeed solved our problem.

You helped me and my colleague tremendously, great thanks.

Kind regards,

Jakub



From: Dawid Wysakowicz
Sent: Tuesday, January 5, 2021 16:57
To: Jakub N
Cc: user@flink.apache.org
Subject: Re: Flink UDF registration from jar at runtime


Hi Jakub,


Sorry for a late reply. I've just came back from the Christmas holidays.


Unfortunately you're right and it's not as easy as I originally thought. 
Apologies for that. It works when you use a constant because of a constant 
expressions reduction. So the function is executed before actually submitting 
the job to the cluster.


It does not work if the function needs to be invoked on the cluster because 
Tasks use a classloader build from a jar files shipped through the BlobManager. 
Tasks cannot simply use a thread classloader, because it does not make much 
sense in case of a distributed setup. What you could try to do with rather a 
hack is to add the files with generated classes to the cluster. You could try 
this approach:


URLClassLoader functionClassloader = ...

try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(functionClassloader)) {
EnvironmentSettings fsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

Configuration effectiveConfiguration = new Configuration();
effectiveConfiguration.set(DeploymentOptions.TARGET, "local");
effectiveConfiguration.set(DeploymentOptions.ATTACHED, true);
ConfigUtils.encodeCollectionToConfig(
effectiveConfiguration,
PipelineOptions.CLASSPATHS,
Arrays.asList(functionClassloader.getURLs().clone()),
URL::toString);
StreamExecutionEnvironment fsEnv = new 
StreamExecutionEnvironment(effectiveConfiguration);

StreamTableEnvironment fsTableEnv = 
StreamTableEnvironment.create(fsEnv, fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

List data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);
fsTableEnv.createFunction("StringFunc", stringFunc);
fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM 
recipes").print();

//Surprisingly the following line can find the StringFunc
//fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM 
recipes").print();
}


This manually creates a LocalEnvironment with the URLs added to the cluster 
classpath. If you use a RemoteEnvironment you could use the ctor that accepts a 
globalClasspath. Bear in mind it uses lower level APIs, that have no stability 
guarantees.


Best,

Dawid


On 13/12/2020 21:22, Jakub N wrote:
Hi Dawid,

thanks a lot for your help. Unfortunately our issue still persists but with 
your example we managed to reconstruct our problem in the following code:

File folder = new File("custom");
URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
folder,
"StringFunc.java",
""
+ "import org.apache.flink.table.functions.ScalarFunction;"
+ "\n"
+ "public class StringFunc extends ScalarFunction {\n"
+ "\tpublic String eval(String b) {\n"
+ "\t\treturn b + \" : udf invoked\";\n"
+ "\t}\n"
+ "}"
);

EnvironmentSettings fsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment fsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, 
fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

Class stringFunc = (Class) 
functionClassloader.loadClass("StringFunc");

List data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);

try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(functionClassloader)) {
fsTableEnv.createFunction("StringFunc", stringFunc);
fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

//Surprisingly the following line can find the StringFunc
//fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM 
recipes").print();
}

This leads to a `java.lang.ClassNotFoundException: StringFunc`. As mentioned in 
the code, the commented line surprisingly works as intended.
Do you have any ideas on why this the case?

Kind regards,

Jakub



Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Yun Gao
Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds 
> complexity (new event). 
> However, the resources would be wasted for a relatively short time until the 
> job finishes completely. 
> And compared to other options, complexity seems much lower. Or are 
> differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in 
this case, the resource for the finished 
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if 
we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to 
introduce some kind of pre-EndOfPartition messages, 
which is similar to the current EndOfPartition, but do not cause the channels 
to be released.

> 2. I think it would be helpful to describe how is rescaling handled in 
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the 
process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 
fo the subtasks are finished, now the state of the operator is composed 
of the state of the 2 remaining subtask instance, if we rescale to 5 after 
failover, the state of the 2 previous remaining subtasks would be 
re-distributed 
to the 5 new subtasks after failover. 

If before failover all the 4 subtasks are finished, the operator would be 
marked as finished, after failover the operator would be still marked as 
finished, 
and all the subtask instance of this operator would skip all the methods like 
open(), endOfInput(), close() and would be excluded when taking checkpoints
 after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's 
> true for operator state, but what about channel state (captured by unaligned 
> checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, 
then its descandent tasks would wait all the records are received
 from the finished tasks before taking checkpoint, thus in this case we would 
not have result partition state, but only have channel state for the 
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one 
checkpoint A has reported FINISHED, CheckpointCoordinator would 
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received 
EndOfPartition from the network (namely inputChannel.onBuffer() is called with 
EndOfPartition) and then taking snapshot for the input channels, as the normal 
unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun



 --Original Mail --
Sender:Khachatryan Roman 
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise 
CC:dev , user 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion (and sorry for probably duplicated 
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds 
complexity (new event). 
However, the resources would be wasted for a relatively short time until the 
job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences 
in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 
2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true 
for operator state, but what about channel state (captured by unaligned 
checkpoint)? I think it still has to be sent downstream which invalidates this 
Option.

Regards,
Roman

On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)

I think we are mixing two different things here that may require different 
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only 

Re: SQL作业的提交方式

2021-01-07 Thread Sebastian Liu
可以尝试一下:https://github.com/ververica/flink-sql-gateway

如果有相关的需求,可以提issue

jiangjiguang719  于2021年1月7日周四 下午8:34写道:

> 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> 1、有没有更好的SQL作业的提交方式?
> 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?



-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


RE: Flink kafka exceptions handling

2021-01-07 Thread BELGHITH Amira (EXT)
Thanks for your feedbacks.

Please find below my answers:



-Message d'origine-

De : Aljoscha Krettek 

Envoyé : jeudi 7 janvier 2021 13:55

À : user@flink.apache.org

Objet : Re: Flink kafka exceptions handling



[EMETTEUR EXTERNE] / [EXTERNAL SENDER]

Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. 
En cas de doute, signalez le message via le bouton "Message suspect" ou 
consultez go/secu.

Be cautious before opening attachments or clicking on any links. If in doubt, 
use "Suspicious email" button or visit go/secu.





Hi,



When you say that the `JobManager` goes down, you're referring to the fact that 
the Flink job will finish in a failed state after too many exceptions have 
occurred in the `FlinkKafkaConsumer. Is that correct?



--> Yes, this is exactly what I meant, the Flink job is in a failed state



I'm afraid right now there is no code path that would allow catching those 
`TopicUnthaurizationException`. We basically treat most exceptions coming from 
Kafka as errors that require recovery.



--> We can have other exceptions, like like TimeoutException when our Kafka 
brokers are degraded.



What behaviour would you have in mind as a reaction to those exceptions?



--> Our processing System is supposed to continue streaming data even though 
there is some Kafka errors, we are expecting that the KafkaConsumer fails but 
not the Flink job, do you think it is possible?



Best,

Aljoscha



On 2021/01/06 17:59, BELGHITH Amira (EXT) wrote:

>

>Thank you for your answer.

>I have been subscribed.

>

>This is the previous topic I’m referring to

>http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACz

>KVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E

>

>Our flink job manager fails after multiple restarting, when the Kafka Consumer 
>does not find a topic for example. We have a kafka exception 
>TopicUnthaurizationException. We listen to a list a topics and whenever one is 
>down , all our streaming system is down .. is there a way to handle those 
>exceptions in the FlinkKafkaConsumer so the job manager does not fail?

>

>

>De : Amira Belghith  Envoyé : mercredi 6

>janvier 2021 18:36 À : BELGHITH Amira (EXT) ResgGtsOpmOptVdf

>; amira.belghith-...@soge.com Objet :

>Fwd: Flink kafka exceptions handling

>

>[EMETTEUR EXTERNE] / [EXTERNAL SENDER]

>Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. 
>En cas de doute, signalez le message via le bouton "Message suspect" ou 
>consultez go/secu.

>Be cautious before opening attachments or clicking on any links. If in doubt, 
>use "Suspicious email" button or visit go/secu.

>

>

>

>-- Message transféré -

>De : Piotr Nowojski mailto:pnowoj...@apache.org>>

>Date : mer. 6 janv. 2021 à 17:26

>Objet : Re: Flink kafka exceptions handling À : Amira Belghith

>mailto:belghith.am...@gmail.com>>

>CC : buggi...@gmail.com

>mailto:buggi...@gmail.com>>

>

>I think you first need to be subscribed as it's explained here [1]. Could you 
>also link to which previous topic are you referring to?

>

>Piotrek

>

>[1] https://flink.apache.org/community.html#mailing-lists

>

>śr., 6 sty 2021 o 17:09 Amira Belghith 
>mailto:belghith.am...@gmail.com>> napisał(a):

>Hey,

>Thanks for your fast reply.

>The mail couldnt be delivered to the mailing list.

>

>Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski 
>mailto:pnowoj...@apache.org>> a écrit :

>Hey,

>

>could you post the question on the user 
>mailto:user@flink.apache.org>> mailing list?

>

>Thanks,

>Piotrek

>

>śr., 6 sty 2021 o 15:11 Amira Belghith 
>mailto:belghith.am...@gmail.com>> napisał(a):

>Hi Nick, Piotr,

>

>Im a software engineer working for Societe Generale bank.

>I saw your discussion about FlinkKafkaConsumer and exceptions handling.

>I have the same problem for a week now, and I wanted to know if you have found 
>a solution.

>Our flink job manager fails after multiple restarting, when the Kafka Consumer 
>does not find a topic for example. We have a kafka exception 
>TopicUnthaurizationException. We listen to a list a topics and whenever one is 
>down , all our streaming system is down .. is there a way to handle those 
>exceptions in the FlinkKafkaConsumer so the job manager does not fail?

>

>Thanks a lot for your help,

>Amira belghith

>

>=

>

>Ce message et toutes les pieces jointes (ci-apres le "message") sont

>confidentiels et susceptibles de contenir des informations couvertes

>par le secret professionnel. Ce message est etabli a l'intention

>exclusive de ses destinataires. Toute utilisation ou diffusion non

>autorisee interdite.

>Tout message electronique est susceptible d'alteration. La SOCIETE

>GENERALE et ses filiales declinent toute responsabilite au titre de ce

>message s'il a ete altere, deforme falsifie.

>


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Khachatryan Roman
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and
adds complexity (new event).
However, the resources would be wasted for a relatively short time until
the job finishes completely.
And compared to other options, complexity seems much lower. Or are
differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in
Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's
true for operator state, but what about channel state (captured by
unaligned checkpoint)? I think it still has to be sent downstream which
invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>
>
> I think we are mixing two different things here that may require different
> solutions:
> 1. Tasks (=sink) that may need to do something with the final checkpoint.
> 2. Tasks that only finish after having finished operations that do not
> depend on data flow (async I/O, but I could also think of some timer
> actions in process functions).
>
> Your proposal would help most for the first case. The second case can
> solved entirely with current methods without being especially complicated:
> - EOP is only emitted once Async I/O is done with all background tasks
> - All timers are fired in a process function (I think we rather want to
> fire immediately on EOP but that's a different discussion)
> The advantage of this approach over your idea is that you don't need to
> wait for a checkpoint to complete to check for finalization.
>
> Now let's look at the first case. I see two alternatives:
> - The new sink interface implicitly incorporates this listener. Since I
> don't see a use case outside sinks, we could simply add this method to the
> new sink interface.
> - We implicitly assume that a sink is done after having a successful
> checkpoint at the end. Then we just need a tag interface
> `RequiresFinalization`. It also feels like we should add the property
> `final` to checkpoint options to help the sink detect that this is the last
> checkpoint to be taken. We could also try to always have the final
> checkpoint without tag interface on new sinks...
>
> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek 
> wrote:
>
>> This is somewhat unrelated to the discussion about how to actually do
>> the triggering when sources shut down, I'll write on that separately. I
>> just wanted to get this quick thought out.
>>
>> For letting operators decide whether they actually want to wait for a
>> final checkpoint, which is relevant at least for Async I/O and
>> potentially for sinks.
>>
>> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>>
>> This way we would decouple that logic from things that don't actually
>> need it. What do you think?
>>
>> Best,
>> Aljoscha
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Flink1.12怎么降级avro到1.8.2?

2021-01-07 Thread Dacheng
Hi,


大家好,


降级avro遇到的问题
在1.12官方文档里提到avro目前使用1.10,但是可以按需降级到1.8.2
https://ci.apache.org/projects/flink/flink-docs-release-1.12/release-notes/flink-1.12.html#upgrade-to-avro-version-1100-from-182-flink-18192
我这边在尝试降级时,遇到了下面的问题


1. 如果直接依赖1.8.2 会导致 flink-avro里的 AvroSchemaConverter#nullableSchema 报错
java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z
at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:448)
我对比了一下,Avro 1.8.2和1.10.0,发现在1.8.2版本里就是没有Schema.isNullable()这个方法


2. 为了解决问题 1,我尝试使用maven shade plugin,把org.apache.avro这个依赖进行relocate
pom为 https://paste.ubuntu.com/p/SMYHy66bc6/
但是报错
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-shade-plugin:3.2.0:shade (shade-deps) on project 
leyan-flink: Error creating shaded jar: Problem shading JAR 
/Users/dongzhi/.m2/repository/org/glassfish/jersey/core/jersey-common/2.30/jersey-common-2.30.jar
 entry 
META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class:
 org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class 
META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class:
 UnsupportedOperationException -> [Help 1]
[ERROR]
这个暂时没找到解决方案。因此求助社区


降级avro的原因  flink-sql集成hive时遇到了问题
另,我之前使用avro 1.10没问题
但是在尝试引入hive依赖,在SQL使用hive的内置函数时,依赖项为
flink-avro-confluent-registry-1.12.0
flink-connector-hive_2.11-1.12.0(provided)
hive-exec 2.3.4 (provided)
hadoop-mapreduce-client-core 2.6.0-cdh5.13.3 <= 引入这个依赖是因为遇到了和  
http://apache-flink.147419.n8.nabble.com/Flink-td7866.html  相同的问题
在flink/lib下的额外jar包为
flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar
hadoop-mapreduce-client-core-2.6.0-cdh5.13.3.jar
遇到了和 https://github.com/confluentinc/schema-registry/issues/1432 
类似的报错。因此尝试降级avro到1.8.2
java.lang.NoSuchFieldError: FACTORY
at org.apache.avro.Schemas.toString(Schemas.java:36) 
~[trade-event.jar:1.12.0] 
但是整个项目使用 mvn dependency:tree分析,没有任何地方使用 avro 1.8.2。
flink应用的发布方式是on-yarn per-job;线上CDH集群使用的avro版本比较旧,不知道是不是因为这个问题导致哪里使用了avro 1.8.2


谢谢

Re: Flink kafka exceptions handling

2021-01-07 Thread Aljoscha Krettek

Hi,

When you say that the `JobManager` goes down, you're referring to the 
fact that the Flink job will finish in a failed state after too many 
exceptions have occurred in the `FlinkKafkaConsumer. Is that correct?


I'm afraid right now there is no code path that would allow catching 
those `TopicUnthaurizationException`. We basically treat most exceptions 
coming from Kafka as errors that require recovery.


What behaviour would you have in mind as a reaction to those exceptions?

Best,
Aljoscha

On 2021/01/06 17:59, BELGHITH Amira (EXT) wrote:


Thank you for your answer.
I have been subscribed.

This is the previous topic I’m referring to 
http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACzKVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E

Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?


De : Amira Belghith 
Envoyé : mercredi 6 janvier 2021 18:36
À : BELGHITH Amira (EXT) ResgGtsOpmOptVdf ; 
amira.belghith-...@soge.com
Objet : Fwd: Flink kafka exceptions handling

[EMETTEUR EXTERNE] / [EXTERNAL SENDER]
Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. En cas de 
doute, signalez le message via le bouton "Message suspect" ou consultez go/secu.
Be cautious before opening attachments or clicking on any links. If in doubt, use 
"Suspicious email" button or visit go/secu.



-- Message transféré -
De : Piotr Nowojski mailto:pnowoj...@apache.org>>
Date : mer. 6 janv. 2021 à 17:26
Objet : Re: Flink kafka exceptions handling
À : Amira Belghith mailto:belghith.am...@gmail.com>>
CC : buggi...@gmail.com 
mailto:buggi...@gmail.com>>

I think you first need to be subscribed as it's explained here [1]. Could you 
also link to which previous topic are you referring to?

Piotrek

[1] https://flink.apache.org/community.html#mailing-lists

śr., 6 sty 2021 o 17:09 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hey,
Thanks for your fast reply.
The mail couldnt be delivered to the mailing list.

Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski 
mailto:pnowoj...@apache.org>> a écrit :
Hey,

could you post the question on the user 
mailto:user@flink.apache.org>> mailing list?

Thanks,
Piotrek

śr., 6 sty 2021 o 15:11 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hi Nick, Piotr,

Im a software engineer working for Societe Generale bank.
I saw your discussion about FlinkKafkaConsumer and exceptions handling.
I have the same problem for a week now, and I wanted to know if you have found 
a solution.
Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?

Thanks a lot for your help,
Amira belghith

=

Ce message et toutes les pieces jointes (ci-apres le "message")
sont confidentiels et susceptibles de contenir des informations
couvertes par le secret professionnel. Ce message est etabli
a l'intention exclusive de ses destinataires. Toute utilisation
ou diffusion non autorisee interdite.
Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
et ses filiales declinent toute responsabilite au titre de ce message
s'il a ete altere, deforme falsifie.

=

This message and any attachments (the "message") are confidential,
intended solely for the addresses, and may contain legally privileged
information. Any unauthorized use or dissemination is prohibited.
E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
of its subsidiaries or affiliates shall be liable for the message
if altered, changed or falsified.

=


SQL作业的提交方式

2021-01-07 Thread jiangjiguang719
目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
1、有没有更好的SQL作业的提交方式?
2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Arvid Heise
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The interface would have a method like `isFinalized()` that the
> framework can call after each checkpoint (and potentially at other
> points)


I think we are mixing two different things here that may require different
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not
depend on data flow (async I/O, but I could also think of some timer
actions in process functions).

Your proposal would help most for the first case. The second case can
solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to
fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to
wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I
don't see a use case outside sinks, we could simply add this method to the
new sink interface.
- We implicitly assume that a sink is done after having a successful
checkpoint at the end. Then we just need a tag interface
`RequiresFinalization`. It also feels like we should add the property
`final` to checkpoint options to help the sink detect that this is the last
checkpoint to be taken. We could also try to always have the final
checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek 
wrote:

> This is somewhat unrelated to the discussion about how to actually do
> the triggering when sources shut down, I'll write on that separately. I
> just wanted to get this quick thought out.
>
> For letting operators decide whether they actually want to wait for a
> final checkpoint, which is relevant at least for Async I/O and
> potentially for sinks.
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The interface would have a method like `isFinalized()` that the
> framework can call after each checkpoint (and potentially at other
> points)
>
> This way we would decouple that logic from things that don't actually
> need it. What do you think?
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-07 Thread Yun Gao
Hi Billy,

I checked the provided example and found it should be a problem of 
ContinuousFileReader, and I created an issue for it[1]. For temporarily go 
around the issue, I think you may disable the chain of 
ContinuousFileReaderOperator with the following operators:

   
android.disableChaining().sinkTo(sink);
Best,
 Yun

[1] https://issues.apache.org/jira/browse/FLINK-20888




 --Original Mail --
Sender:Billy Bain 
Send Date:Thu Jan 7 04:02:34 2021
Recipients:Arvid Heise 
CC:user , Billy Bain 
Subject:Re: Implementing a TarInputFormat based on FileInputFormat

Hi Arvid, 

Thanks for the response. I have created a sample application with input data 
and uploaded it to google drive. The sample data is in the archive... thus the 
large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader 
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path 
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String inputPath = parameter.get("input_path");
String outputPath = parameter.get("output_path");
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource android = env.readFile(new 
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024)
.build())
.build();
android.sinkTo(sink);
env.execute("zMarket Android");
}
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise  wrote:

Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you 
maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:

I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and 
that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I 
have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at 
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at 
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at 
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at 
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat extends FileInputFormat implements 
ResultTypeQueryable {

private static final Logger logger = 
LoggerFactory.getLogger(TarInputFormat.class);
   

Flink1.12触发保存点时失败

2021-01-07 Thread 赵一旦
报错信息如下:
java.lang.IllegalArgumentException: Can not set long field
com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:167)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:171)
at sun.reflect.UnsafeLongFieldAccessorImpl.set(
UnsafeLongFieldAccessorImpl.java:80)
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:409)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)


根据堆栈找到报错位置代码为:

try {
   for (int i = 0; i < numFields; i++) {
  boolean isNull = source.readBoolean();

  if (fields[i] != null) {
 if (isNull) {
fields[i].set(target, null); // 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
 } else {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
 }
  } else if (!isNull) {
 // read and dump a pre-existing field value
 fieldSerializers[i].deserialize(source);
  }
   }
} catch (IllegalAccessException e) {
   throw new RuntimeException("Error during POJO copy, this should not
happen since we check the fields before.", e);
}


Re: Flink kafka exceptions handling

2021-01-07 Thread Piotr Nowojski
Hi Amira,

I think the previous topic you are referring to doesn't seem to be related
with your current problem.

Regarding your problem, I'm afraid I don't know the FlinkKafkaConsumer code
too well. Maybe someone else from the community could help?

Best,
Piotrek

śr., 6 sty 2021 o 19:01 BELGHITH Amira (EXT) 
napisał(a):

>
>
> Thank you for your answer.
>
> I have been subscribed.
>
>
>
> This is the previous topic I’m referring to
> http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACzKVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E
>
>
>
> *Our flink job manager fails after multiple restarting, when the Kafka
> Consumer does not find a topic for example. We have a kafka exception
> TopicUnthaurizationException. We listen to a list a topics and whenever one
> is down , all our streaming system is down .. is there a way to handle
> those exceptions in the FlinkKafkaConsumer so the job manager does not
> fail?*
>
>
>
>
>
> *De :* Amira Belghith 
> *Envoyé :* mercredi 6 janvier 2021 18:36
> *À :* BELGHITH Amira (EXT) ResgGtsOpmOptVdf ;
> amira.belghith-...@soge.com
> *Objet :* Fwd: Flink kafka exceptions handling
>
>
>
> [EMETTEUR EXTERNE] / [EXTERNAL SENDER]
> Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les
> liens. En cas de doute, signalez le message via le bouton "Message suspect"
> ou consultez go/secu.
> Be cautious before opening attachments or clicking on any links. If in
> doubt, use "Suspicious email" button or visit go/secu.
>
>
>
>
>
>
>
> -- Message transféré -
> De : *Piotr Nowojski* 
> Date : mer. 6 janv. 2021 à 17:26
> Objet : Re: Flink kafka exceptions handling
> À : Amira Belghith 
> CC : buggi...@gmail.com 
>
>
>
> I think you first need to be subscribed as it's explained here [1]. Could
> you also link to which previous topic are you referring to?
>
>
>
> Piotrek
>
>
>
> [1] https://flink.apache.org/community.html#mailing-lists
>
>
>
> śr., 6 sty 2021 o 17:09 Amira Belghith 
> napisał(a):
>
> Hey,
>
> Thanks for your fast reply.
>
> The mail couldnt be delivered to the mailing list.
>
>
>
> Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski  a
> écrit :
>
> Hey,
>
>
>
> could you post the question on the user  mailing
> list?
>
>
>
> Thanks,
>
> Piotrek
>
>
>
> śr., 6 sty 2021 o 15:11 Amira Belghith 
> napisał(a):
>
> Hi Nick, Piotr,
>
>
>
> Im a software engineer working for Societe Generale bank.
>
> I saw your discussion about FlinkKafkaConsumer and exceptions handling.
>
> I have the same problem for a week now, and I wanted to know if you have
> found a solution.
>
> Our flink job manager fails after multiple restarting, when the Kafka
> Consumer does not find a topic for example. We have a kafka exception
> TopicUnthaurizationException. We listen to a list a topics and whenever one
> is down , all our streaming system is down .. is there a way to handle
> those exceptions in the FlinkKafkaConsumer so the job manager does not fail?
>
>
>
> Thanks a lot for your help,
>
> Amira belghith
>
>
>
> =
>
> Ce message et toutes les pieces jointes (ci-apres le "message")
> sont confidentiels et susceptibles de contenir des informations
> couvertes par le secret professionnel. Ce message est etabli
> a l'intention exclusive de ses destinataires. Toute utilisation
> ou diffusion non autorisee interdite.
> Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
> et ses filiales declinent toute responsabilite au titre de ce message
> s'il a ete altere, deforme falsifie.
>
> =
>
> This message and any attachments (the "message") are confidential,
> intended solely for the addresses, and may contain legally privileged
> information. Any unauthorized use or dissemination is prohibited.
> E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
> of its subsidiaries or affiliates shall be liable for the message
> if altered, changed or falsified.
>
> =
>


Flink app logs to Elastic Search

2021-01-07 Thread bat man
Hi Team,

I have a requirement to push the flink app logs to Elastic Search for log
management. Can anyone guide if you are already doing this.
I have tried this -
https://cristian.io/post/flink-log4j/
I’m not getting any error for a sample job I tried.

I am using EMR to run Flink 1.9 and Elastic Search latest version running
on ec2 machine.

Thanks,
Hemant


sql-client配置hive启动不了

2021-01-07 Thread amenhub
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql 
client

报错信息:https://imgchr.com/i/smQrlj

flink 版本,1.12
hive 版本,3.1.0





Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Aljoscha Krettek
This is somewhat unrelated to the discussion about how to actually do 
the triggering when sources shut down, I'll write on that separately. I 
just wanted to get this quick thought out.


For letting operators decide whether they actually want to wait for a 
final checkpoint, which is relevant at least for Async I/O and 
potentially for sinks.


We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)


This way we would decouple that logic from things that don't actually 
need it. What do you think?


Best,
Aljoscha


Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther

Hi Aeden,

`format.avro-schema` is not required anymore in the new design. The Avro 
schema is derived entirely from the table's schema.


Regards,
Timo



On 07.01.21 09:41, Aeden Jameson wrote:

Hi Timo,

Thanks for responding. You're right. So I did update the properties.

From what I can tell the new design you're referring to uses the

KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
support those options. Is that right? So I updated my configuration to

connector= 'kafka'
topic   = 'my-topic'
properties.group.id = 'my-consumer-group'
properties.bootstrap.servers = '...'
format = 'avro'
format.avro-schema = ''
key.fields = 'my_key_field'

However, the property format.avro-schema doesn't appear to be
supported by KafkaDynamicTableFactory. I get this exception.

Caused by: org.apache.flink.table.api.ValidationException: Unsupported
options found for connector 'kafka'.

Unsupported options:

format.avro-schema

Supported options:

connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.parallelism
sink.partitioner
sink.semantic
topic
topic-pattern
value.fields-include
value.format
 at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
 at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
 at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
 at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
 ... 21 more

FAILURE: Build failed with an exception.




The format.avro-schema property was supported it what looks to me the
old design in in KafkaTableSourceSinkFactoryBase with this line,

 properties.add(FORMAT + ".*");

 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160

Does format.avro-schema need to be specified differently?

Thank you,
Aeden

On Thu, Jan 7, 2021 at 12:15 AM Timo Walther  wrote:


Hi Aeden,

we updated the connector property design in 1.11 [1]. The old
translation layer exists for backwards compatibility and is indicated by
`connector.type=kafka`.

However, `connector = kafka` indicates the new property design and
`key.fields` is only available there. Please check all properties again
when upgrading, they are mentioned here [2].

Regards,
Timo


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/


On 06.01.21 18:35, Aeden Jameson wrote:

Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:


Hey,

have you added Kafka connector as the dependency? [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies

Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):


I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type= 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = ''

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
  at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
  at 

flink的算子没有类似于spark的cache操作吗?

2021-01-07 Thread 李继
HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作

val env = getBatchEnv
val ds = env.fromElements("a","b","c")

val ds2 = ds.map(x=>{
  println("map op")
  x.charAt(0).toInt+1
})

//此操作会打印三遍map op
ds2.print()

//此操作又会打印三遍map op
ds2.filter(_>100).print()


Re: Using key.fields in 1.12

2021-01-07 Thread Aeden Jameson
Hi Timo,

Thanks for responding. You're right. So I did update the properties.
>From what I can tell the new design you're referring to uses the
KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
support those options. Is that right? So I updated my configuration to

connector= 'kafka'
topic   = 'my-topic'
properties.group.id = 'my-consumer-group'
properties.bootstrap.servers = '...'
format = 'avro'
format.avro-schema = ''
key.fields = 'my_key_field'

However, the property format.avro-schema doesn't appear to be
supported by KafkaDynamicTableFactory. I get this exception.

Caused by: org.apache.flink.table.api.ValidationException: Unsupported
options found for connector 'kafka'.

Unsupported options:

format.avro-schema

Supported options:

connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.parallelism
sink.partitioner
sink.semantic
topic
topic-pattern
value.fields-include
value.format
at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 21 more

FAILURE: Build failed with an exception.




The format.avro-schema property was supported it what looks to me the
old design in in KafkaTableSourceSinkFactoryBase with this line,

properties.add(FORMAT + ".*");


https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160

Does format.avro-schema need to be specified differently?

Thank you,
Aeden

On Thu, Jan 7, 2021 at 12:15 AM Timo Walther  wrote:
>
> Hi Aeden,
>
> we updated the connector property design in 1.11 [1]. The old
> translation layer exists for backwards compatibility and is indicated by
> `connector.type=kafka`.
>
> However, `connector = kafka` indicates the new property design and
> `key.fields` is only available there. Please check all properties again
> when upgrading, they are mentioned here [2].
>
> Regards,
> Timo
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>
>
> On 06.01.21 18:35, Aeden Jameson wrote:
> > Yes, I do have that dependency. I see it in the dependency view of
> > intellij and directly. in the uber jar. Thanks for responding.
> >
> > - Aeden
> >
> > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:
> >>
> >> Hey,
> >>
> >> have you added Kafka connector as the dependency? [1]
> >>
> >> [1] 
> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> >>
> >> Best,
> >> Piotrek
> >>
> >> śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):
> >>>
> >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> >>> feature of the Kafa SQL Connector. My current connector is configured
> >>> as ,
> >>>
> >>> connector.type= 'kafka'
> >>> connector.version = 'universal'
> >>> connector.topic   = 'my-topic'
> >>> connector.properties.group.id = 'my-consumer-group'
> >>> connector.properties.bootstrap.servers = '...'
> >>> format.type = 'avro'
> >>> format.avro-schema = ''
> >>>
> >>> I tried adding
> >>>
> >>> key.fields = 'my_key_field'
> >>>
> >>> as well as
> >>>
> >>> key.format = 'avro'
> >>> key.fields = 'my_key_field'
> >>>
> >>> but I get the exception
> >>>
> >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> >>> Could not find a suitable table factory for
> >>> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>> the classpath.
> >>>
> >>> Reason: No factory supports all properties.
> >>>
> >>> The matching candidates:
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>> Unsupported property keys:
> >>> key.fields
> >>> key.format
> >>>
> >>> The following factories have been considered:
> >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>>  at 
> >>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >>>  at 
> >>> 

Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 Thread bradyMk
好的,我研究一下,谢谢指导~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 Thread jiangjiguang719
1、是自研的
2、定时调度  
http://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API





在 2021-01-07 16:15:25,"bradyMk"  写道:
>Hi~
>请教一下:
>①您说的这个实时计算平台是你们自研的么?
>②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂改如何调用。。。
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 Thread bradyMk
Hi~Yun Tang大佬~

不是很理解为什么监控这个指标需要维护状态?该维护什么状态?怎么去维护呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 Thread bradyMk
Hi~
请教一下:
①您说的这个实时计算平台是你们自研的么?
②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂该如何调用。。。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 Thread bradyMk
Hi~
请教一下:
①您说的这个实时计算平台是你们自研的么?
②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂改如何调用。。。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther

Hi Aeden,

we updated the connector property design in 1.11 [1]. The old 
translation layer exists for backwards compatibility and is indicated by 
`connector.type=kafka`.


However, `connector = kafka` indicates the new property design and 
`key.fields` is only available there. Please check all properties again 
when upgrading, they are mentioned here [2].


Regards,
Timo


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/


On 06.01.21 18:35, Aeden Jameson wrote:

Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:


Hey,

have you added Kafka connector as the dependency? [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies

Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):


I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type= 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = ''

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
 at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
 at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
 at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
 ... 21 more

I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?

--
Thank You,
Aeden