flink1.11.2检查点失败

2020-12-17 Thread zhy
hi、
我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行


flink1.9.1 支持一个 source 指定消费多个 topics么?

2020-12-17 Thread bradyMk
Hi,想请教一下大家:

最近通过flink_taskmanager_job_task_operator_KafkaConsumer_records_consumed_rate指标发现,
flink某个任务消费一个topic A 竟然比消费topic A,B,C,D一起的指标要高,
也就是我四个topic每秒消费的数据竟然还没其中一个topic每秒消费的数据高,
所以想请问:flink1.9.1 支持一个 source 指定消费多个 topics么?
我的代码如下:
val A= params.getProperty("kafka.scene.data.topic")
val B= params.getProperty("kafka.scene.log.topic")
val C= params.getProperty("kafka.event.topic")
val D= params.getProperty("kafka.log.topic")
import scala.collection.JavaConverters._
val topics = List[String](sceneDataTopic, sceneLogTopic, eventTopic,
sdkLog).asJava
env .addSource(new FlinkKafkaConsumer011(topics, new
JSONKeyValueDeserializationSchema(false), kafkaPro))




-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Topic not present in metadata after 60000 ms.

2020-12-17 Thread Level1accelerator
版本1.11.2
用这种方式FlinkKafkaProducer(String topicId, SerializationSchema
serializationSchema, Properties producerConfig)生产没问题,数据写入无报错

这种方式
FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) 
就会报上面的错误,不知为何,哪位大佬解答下




--
Sent from: http://apache-flink.147419.n8.nabble.com/


????????flink1.12????????????????state????

2020-12-17 Thread ??????
Hiflink1.12group
 by current_date,userId ??flink 
state?? 1??Stream??TTL 
2??tabEnv.getConfig().setIdleStateRetention(Duration.ofDays(1))

flink 1.11 interval join??????rocksdb????????????

2020-12-17 Thread 867127831
Hi,


flink 1.11 on k8sjoin??sql??rocksdbbackend??flink 
managedflink??state.backend.rocksdb.memory.managed=truek8s??pod


flink sql:


insert into console_sink
select t1.*, t2.*
from t1 left join t2
on t1.unique_id = t2.unique_id
and t1.event_time BETWEEN t2.event_time - INTERVAL '1' HOUR AND t2.event_time + 
INTERVAL '1' HOUR



??
state.backend=rocksdb;
state.backend.incremental=false;
state.backend.rocksdb.memory.managed=true
state.idle.retention.mintime='10 min';
state.idle.retention.maxtime='20 min';
checkpoint.time.interval='15 min';
source.idle-timeout='6 ms';

taskmanager.memory.flink.size =55 gb
taskmanager.memory.managed.fraction=0.85






??
1. checkpoint??size??200G??state
2. k8s 
podpodpod??pod
3. promethus??metrics, 
rocksdb_block_cache_usagerocksdb_block_cache_capacityrocksdb_block_cache_usageflink
 managed


flink??rocksdb??rocksdb_block_cache_usage

StreamTableEnvironmentImpl ??????????????

2020-12-17 Thread ????????
??org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#create
 ??  ??
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch 
mode for now, please use TableEnvironment.");
}



 ?? 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#StreamTableEnvironmentImpl
 


?? ??


    ?? ??
http://apache-flink.147419.n8.nabble.com/Blink-Planner-Remote-Env-td3162.html#a3180

Re: Blink Planner构造Remote Env

2020-12-17 Thread 莫失莫忘
我现在也碰到了这个问题,也是删除了方法中检查模式的代码 settings.isStreamingMode()。源码为什么要加这样一个检测呢? 感觉
StreamTableEnvironmentImpl 原来是能跑批的,现在加上这个检测 反而不能了。我看最新版的 1.12 还有这个检测代码呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: flink clickhouse connector

2020-12-17 Thread guoliubi...@foxmail.com
有,但是贴做附件时因为超长没法发出去
:
ezmlm-reject: fatal: Sorry, I don't accept messages larger than 100 bytes 
(#5.2.3)

发你私人邮箱是否方便



guoliubi...@foxmail.com
 
From: magichuang
Date: 2020-12-17 20:18
To: user-zh
Subject: Re: Re: flink clickhouse connector
您是用java写的还是pyflink  啊?  我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包  
但是一直在下载依赖好多。。
 
 
> -- 原始邮件 --
> 发 件 人:"guoliubi...@foxmail.com" 
> 发送时间:2020-12-17 19:36:55
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink clickhouse connector
>
> 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。
>
>
>
> guoliubi...@foxmail.com
>
> From: magichuang
> Date: 2020-12-17 18:41
> To: user-zh
> Subject: flink clickhouse connector
> hi 想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>  这个flink-connector,但是运行报错了:
>
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
>
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
>
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
>
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
>
> ... 12 more
>
>
>
>
> 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11
>
>
>
>
> 有小伙伴成功对接的嘛,可否分享一下connector呀
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
>
 
 
 
--
 
Best,
 
MagicHuang
 
 


jdbc sink无法插入数据

2020-12-17 Thread guoliubi...@foxmail.com
Hi,

我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
.process(new ProcessFunction() {
@Override
public void processElement(RatioValuevalue, Context ctx, 
Collector out) throws Exception {
out.collect(value);
ctx.output(ratioOutputTag, value);
}
});
sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), 
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
DataStream ratioSideStream = 
sideStream.getSideOutput(ratioOutputTag);
ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
想问下这种情况是否有什么排查手段?


guoliubi...@foxmail.com


Flink 1.11.2 读写Hive以及对hive的版本支持

2020-12-17 Thread Jacob
Dear All,

Flink.11.2操作hive时,对hive的版本支持是怎样的


看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
我的执行环境:

*Flink : 1.11.2*
*Haoop : 2.6.0-cdh5.8.3*
*Hive : 1.1.0-cdh5.8.3*
*Job运行方式 : on yarn*

同时对读写hive的demo,我不知道我写的是否正确:

public static void main(String[] args) throws Exception {

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "datafeed";
String hiveConfDir = "/opt/app/bigdata/hive-1.1.0-cdh5.8.3/conf"; 
// hive-site.xml路径
String version = "1.1.0-cdh5.8.3";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);

tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
String createDbSql = "INSERT INTO TABLE flink2hive_test VALUES
('55', \"333\", \"CHN\")";
tableEnv.sqlUpdate(createDbSql);  
}

这样的job提交到yarn会报错:

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapreduce.TaskAttemptContext

是缺少MapReduce的相关包吗?





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12

2020-12-17 Thread Yang Wang
The latest successful checkpoint pointer is stored in the ConfigMap, as
well as the JobGraph pointer.
They could help us recover the running jobs before you delete the K8s
deployment. If the HA ConfigMaps
are deleted, then when you create a Flink cluster with the same cluster-id,
it could not recover from the latest
successful checkpoint automatically.

Best,
Yang




Boris Lublinsky  于2020年12月18日周五 上午11:42写道:

> Also re reading
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>
> This does not seem right:
> To keep HA data while restarting the Flink cluster, simply delete the
> deployment (via kubectl delete deploy ). All the Flink
> cluster related resources will be deleted (e.g. JobManager Deployment,
> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps
> will be retained because they do not set the owner reference. When
> restarting the cluster, all previously running jobs will be recovered and
> restarted from the latest successful checkpoint.
>
> Last successful checkpoint is not in the config maps, but rather on
> persistent volume. Config map can be safely deleted. If you restart JM, it
> will create a new leader anyways., So I would suggest to add owner
> reference there
>
>
> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
>
> Hi Boris,
>
> Thanks for your follow up response and trying the new KubernetesHAService.
>
> 1. It is a valid bug. We are not setting the service account for
> TaskManager pod. Before the KubernetesHAService is introduced, it works
> fine because the TaskManager does not need to access the K8s resource(e.g.
> ConfigMap) directly. I have created a ticket[1] to support setting service
> account for TaskManager.
> 2. If you directly delete the JobManager deployment, then the HA related
> ConfigMap will be retained. It is a by-design behavior. Because the job
> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
> cluster could recover in the future. If all the jobs in the application
> reach to the terminal state, all the HA related ConfigMaps will be cleaned
> up automatically. You could cancel the job and verify that. Refer here[2]
> for more information.
>
> For the PVC based storage, if it could support multiple read-write then
> the KubernetesHAService should work. Actually, it feels like a distributed
> storage.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-20664
> [2].
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>
> Best,
> Yang
>
> Boris Lublinsky  于2020年12月18日周五 上午7:16写道:
>
>> And K8 native HA works,
>> But there are 2 bugs in this implementation.
>>
>> 1. Task manager pods are running as default user account, which fails
>> because it does not have access to config maps to get endpoint’s
>> information. I had to add permissions to default service account to make it
>> work. Ideally both JM and TM pods should run under the same service
>> account.
>> 2. When a Flink application is deleted, it clears the main config map,
>> but not the ones used for leader election
>>
>>
>> And finally it works fine with PVC based storage, as long as it is
>> read-write many
>>
>>
>> On Dec 15, 2020, at 8:40 PM, Yang Wang  wrote:
>>
>> Hi Boris,
>>
>> What is -p 10?
>>
>> It is same to --parallelism 10. Set the default parallelism to 10.
>>
>> does it require a special container build?
>>
>> No, the official flink docker image could be used
>> directly. Unfortunately, we do not have the image now. And we are trying to
>> figure out.
>> You could follow the instructions below to have your own image.
>>
>>
>> git clone https://github.com/apache/flink-docker.git
>> git checkout dev-master./add-custom.sh -u 
>> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
>>  -n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t 
>> flink:flink-1.12.0docker push flink:flink-1.12.0
>>
>>
>> This is if I use HDFS for save pointing, right? I can instead use PVC -
>>> based save pointing, correct?
>>
>> It is an example to storing the HA related data to OSS(Alibaba Cloud
>> Object Storage, similar to S3). Since we require a distributed storage, I
>> am afraid you could not use a PVC here. Instead, you could using a minio.
>>
>> Can I control the amount of standby JMs?
>>
>> Currently, you could not control the number of JobManagers. This is only
>> because we have not introduce a config option for it. But you could do it
>> manually via `kubectl edit deploy `. It should also work.
>>
>> Finally, what is behavior on the rolling restart of JM deployment?
>>
>> Once a JobManager terminated, it will lose the leadership and a standby
>> one will take over. So on the rolling restart of JM deployment, you will
>> find that the leader switches multiple times and your job also restarts
>> multiple times. I am not sure why you need to roll the JobManager
>> deployment. We are using deployment for 

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-17 Thread Storm☀️
state.backend.incremental 出现问题的时候增量模式是开启的吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12

2020-12-17 Thread Boris Lublinsky
Also re reading 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
 


This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via kubectl delete deploy ). All the Flink cluster 
related resources will be deleted (e.g. JobManager Deployment, TaskManager 
pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained 
because they do not set the owner reference. When restarting the cluster, all 
previously running jobs will be recovered and restarted from the latest 
successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent 
volume. Config map can be safely deleted. If you restart JM, it will create a 
new leader anyways., So I would suggest to add owner reference there


> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
> 
> Hi Boris,
> 
> Thanks for your follow up response and trying the new KubernetesHAService.
> 
> 1. It is a valid bug. We are not setting the service account for TaskManager 
> pod. Before the KubernetesHAService is introduced, it works fine because the 
> TaskManager does not need to access the K8s resource(e.g. ConfigMap) 
> directly. I have created a ticket[1] to support setting service account for 
> TaskManager. 
> 2. If you directly delete the JobManager deployment, then the HA related 
> ConfigMap will be retained. It is a by-design behavior. Because the job does 
> not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster 
> could recover in the future. If all the jobs in the application reach to the 
> terminal state, all the HA related ConfigMaps will be cleaned up 
> automatically. You could cancel the job and verify that. Refer here[2] for 
> more information.
> 
> For the PVC based storage, if it could support multiple read-write then the 
> KubernetesHAService should work. Actually, it feels like a distributed 
> storage.
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-20664 
> 
> [2]. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>  
> 
> 
> Best,
> Yang
> 
> Boris Lublinsky  > 于2020年12月18日周五 上午7:16写道:
> And K8 native HA works,
> But there are 2 bugs in this implementation.
> 
> 1. Task manager pods are running as default user account, which fails because 
> it does not have access to config maps to get endpoint’s information. I had 
> to add permissions to default service account to make it work. Ideally both 
> JM and TM pods should run under the same service account. 
> 2. When a Flink application is deleted, it clears the main config map, but 
> not the ones used for leader election
> 
> 
> And finally it works fine with PVC based storage, as long as it is read-write 
> many
> 
> 
>> On Dec 15, 2020, at 8:40 PM, Yang Wang > > wrote:
>> 
>> Hi Boris,
>> 
>> What is -p 10?
>> It is same to --parallelism 10. Set the default parallelism to 10.
>> 
>> does it require a special container build?
>> No, the official flink docker image could be used directly. Unfortunately, 
>> we do not have the image now. And we are trying to figure out.
>> You could follow the instructions below to have your own image.
>> 
>> 
>> git clone https://github.com/apache/flink-docker.git 
>> 
>> 
>> git checkout dev-master
>> 
>> ./add-custom.sh -u 
>> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
>>  
>> 
>>  -n flink-1.12.0
>> 
>> cd dev/flink-1.12.0-debian
>> docker build . -t flink:flink-1.12.0
>> docker push flink:flink-1.12.0
>> 
>> This is if I use HDFS for save pointing, right? I can instead use PVC - 
>> based save pointing, correct?
>> It is an example to storing the HA related data to OSS(Alibaba Cloud Object 
>> Storage, similar to S3). Since we require a distributed storage, I am afraid 
>> you could not use a PVC here. Instead, you could using a minio.
>> 
>> Can I control the amount of standby JMs? 
>> Currently, you could not control the number of JobManagers. This is only 
>> because we have not introduce a config option for it. But you could do it 
>> manually via `kubectl edit deploy `. It should also work.
>> 
>> Finally, what is behavior on the rolling restart of JM deployment?
>> Once a JobManager terminated, it will lose the leadership and a standby one 
>> will take over. So on the rolling restart of JM deployment, you will find 
>> that the leader switches multiple times and your job also restarts multiple 
>> 

Re: Flink 1.12

2020-12-17 Thread Boris Lublinsky
Thanks Yang,


> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
> 
> Hi Boris,
> 
> Thanks for your follow up response and trying the new KubernetesHAService.
> 
> 1. It is a valid bug. We are not setting the service account for TaskManager 
> pod. Before the KubernetesHAService is introduced, it works fine because the 
> TaskManager does not need to access the K8s resource(e.g. ConfigMap) 
> directly. I have created a ticket[1] to support setting service account for 
> TaskManager. 
> 2. If you directly delete the JobManager deployment, then the HA related 
> ConfigMap will be retained. It is a by-design behavior. Because the job does 
> not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster 
> could recover in the future. If all the jobs in the application reach to the 
> terminal state, all the HA related ConfigMaps will be cleaned up 
> automatically. You could cancel the job and verify that. Refer here[2] for 
> more information.
> 
> For the PVC based storage, if it could support multiple read-write then the 
> KubernetesHAService should work. Actually, it feels like a distributed 
> storage.
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-20664 
> 
> [2]. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>  
> 
> 
> Best,
> Yang
> 
> Boris Lublinsky  > 于2020年12月18日周五 上午7:16写道:
> And K8 native HA works,
> But there are 2 bugs in this implementation.
> 
> 1. Task manager pods are running as default user account, which fails because 
> it does not have access to config maps to get endpoint’s information. I had 
> to add permissions to default service account to make it work. Ideally both 
> JM and TM pods should run under the same service account. 
> 2. When a Flink application is deleted, it clears the main config map, but 
> not the ones used for leader election
> 
> 
> And finally it works fine with PVC based storage, as long as it is read-write 
> many
> 
> 
>> On Dec 15, 2020, at 8:40 PM, Yang Wang > > wrote:
>> 
>> Hi Boris,
>> 
>> What is -p 10?
>> It is same to --parallelism 10. Set the default parallelism to 10.
>> 
>> does it require a special container build?
>> No, the official flink docker image could be used directly. Unfortunately, 
>> we do not have the image now. And we are trying to figure out.
>> You could follow the instructions below to have your own image.
>> 
>> 
>> git clone https://github.com/apache/flink-docker.git 
>> 
>> 
>> git checkout dev-master
>> 
>> ./add-custom.sh -u 
>> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
>>  
>> 
>>  -n flink-1.12.0
>> 
>> cd dev/flink-1.12.0-debian
>> docker build . -t flink:flink-1.12.0
>> docker push flink:flink-1.12.0
>> 
>> This is if I use HDFS for save pointing, right? I can instead use PVC - 
>> based save pointing, correct?
>> It is an example to storing the HA related data to OSS(Alibaba Cloud Object 
>> Storage, similar to S3). Since we require a distributed storage, I am afraid 
>> you could not use a PVC here. Instead, you could using a minio.
>> 
>> Can I control the amount of standby JMs? 
>> Currently, you could not control the number of JobManagers. This is only 
>> because we have not introduce a config option for it. But you could do it 
>> manually via `kubectl edit deploy `. It should also work.
>> 
>> Finally, what is behavior on the rolling restart of JM deployment?
>> Once a JobManager terminated, it will lose the leadership and a standby one 
>> will take over. So on the rolling restart of JM deployment, you will find 
>> that the leader switches multiple times and your job also restarts multiple 
>> times. I am not sure why you need to roll the JobManager deployment. We are 
>> using deployment for JobManager in Flink just because we want the JobManager 
>> to be launched once it crashed. Another reason for multiple JobManagers is 
>> to get a faster recovery.
>> 
>> 
>> Best,
>> Yang
>>  
>> 
>> Boris Lublinsky > > 于2020年12月16日周三 上午9:09写道:
>> Thanks Chesney for your quick response,
>> I read documentation 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>>  
>> 
>> More carefully and found the sample, I was looking for:
>> 
>> ./bin/flink run-application -p 10 -t kubernetes-application 
>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>> -Dkubernetes.container.image=flink:k8s-ha \ 
>> 

Flink eventTIme????

2020-12-17 Thread ?g???U?[????
Hi all  When I use SQL with UDTF, when I call the 
tableEnv.sqlQuery () method, I throw the following error: Rowtime attributes 
must not be in the input rows of a regular join. As a workaround you can cast 
the time attributes of input tables to TIMESTAMP before.I used the 
to_timestamp function in eventTIme and it doesn't work,How to solve the 
problem?


sql: select
 tmp.metric_id as 
metric_id,
 
tmp.metric_config as metric_config,
 startLat,
 destName,
 bizType,
 orderId,
 
completedTime,
 
orderStatus,
 
startHexList,
 cityId,
 type,
 destLat,
 endHexList,
 destLng,
 createTime,
 
passengerId,
 
finishedTime,
 vehicleId,
 startLng,
 startName,
 eventTime
from
 
htw_order_dwd_htw_order_geo_Infos,
 lateral table(
  
metricUdtf('aa')
 ) as 
tmp(metric_id, metric_config)


Thanks
Jiazhi

Flink eventTIme????

2020-12-17 Thread ?g???U?[????
Hi all  When I use SQL with UDTF, when I call the 
tableEnv.sqlQuery () method, I throw the following error: Rowtime attributes 
must not be in the input rows of a regular join. As a workaround you can cast 
the time attributes of input tables to TIMESTAMP before.I used the 
to_timestamp function in eventTIme and it doesn't work,How to solve the 
problem?


sql: select
 tmp.metric_id as 
metric_id,
 
tmp.metric_config as metric_config,
 startLat,
 destName,
 bizType,
 orderId,
 
completedTime,
 
orderStatus,
 
startHexList,
 cityId,
 type,
 destLat,
 endHexList,
 destLng,
 createTime,
 
passengerId,
 
finishedTime,
 vehicleId,
 startLng,
 startName,
 eventTime
from
 
htw_order_dwd_htw_order_geo_Infos,
 lateral table(
  
metricUdtf('aa')
 ) as 
tmp(metric_id, metric_config)


Thanks
Jiazhi

Re: flink1.12 docker 镜像啥时候有

2020-12-17 Thread Yang Wang
OK,我看目前flink-docker项目里面的docker-entrypoint.sh是正常,有其他问题你再继续反馈


Best,
Yang

superainbower  于2020年12月18日周五 上午8:33写道:

> hi,我重新git下来,build又可以了,可能之前我下的有文件有问题
>
> 在2020年12月17日 14:08,Yang Wang 写道:
> 你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的
>
> 你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的
>
>   - git clone https://github.com/apache/flink-docker.git
>   - cd scala_2.11-java8-debian
>   - sudo docker build -t flink:1.12.0 .
>   - docker push
>
>
> Best,
> Yang
>
> superainbower  于2020年12月17日周四 上午7:19写道:
>
> > 请教下 git checkout dev-master./add-custom.sh -u
> >
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> > -n flink-1.12.0 这是一条指令吗?感觉执行不了
> > 另外直接
> > git clone https://github.com/apache/flink-docker.git
> > 在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0
> .可以吗,我尝试直接这样构建出来的镜像好像不能跑
> >
> > 在2020年12月16日 10:56,Yang Wang 写道:
> > 目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
> > 具体你可以跟进一下这个PR
> https://github.com/docker-library/official-images/pull/9249
> >
> > 当前你也可以自己build一个镜像来使用,方法如下:
> >
> > git clone https://github.com/apache/flink-docker.git
> > git checkout dev-master./add-custom.sh -u
> >
> >
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> > -n
> > <
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz-n
> >
> > flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
> > flink:flink-1.12.0docker push flink:flink-1.12.0
> >
> >
> >
> > jiangjiguang719  于2020年12月9日周三 下午5:09写道:
> >
> > > 请问啥时候 在docker hub中可以看到1.12版本的镜像?
> >
>


Re: Re:Re: flink sql作业state size一直增加

2020-12-17 Thread Storm☀️
mini batch默认为false 。题主问题找到了吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink UDF with ARRAY as input

2020-12-17 Thread Xingbo Huang
Hi Torben,

It is indeed a bug, and I have created a JIRA[1]. The work around solution
is to use the index to solve (written in release-1.12):

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
my_string = 'xxx'
for element in my_list:
if element[0] == 2:
my_string = element[1]
return my_string

t = t_env.from_elements(
[("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2",
[Row(2, "python")])],
DataTypes.ROW(
[DataTypes.FIELD("Key", DataTypes.STRING()),
 DataTypes.FIELD("List_element",
 DataTypes.ARRAY(DataTypes.ROW(
 [DataTypes.FIELD("integer_element",
DataTypes.INT()),
  DataTypes.FIELD("string_element",
DataTypes.STRING())])))]))
print(t.select(get_string_element(t.List_element)).to_pandas())



[1] https://issues.apache.org/jira/browse/FLINK-20666

Best,
Xingbo

Barth, Torben  于2020年12月18日周五 上午2:46写道:

> Dear List,
>
>
>
> I have  a table with the following structure
>
>
>
> my_table
>
> -- Key: String
>
> -- List_element: ARRAY >>
>
>
>
> I want to define a udf to extract information of the “list_element”. I do
> not manage to access the information of the array in the udf. I try
> something like:
>
>
>
> @udf(result_type=DataTypes.STRING())
>
> def get_string_element(my_list):
>
>my_string = ‘xxx’
>
> for element in my_list:
>
> if element.integer_element  == 2:
>
> my_string = element. string_element
>
> return my_string
>
>
>
>
>
> table_env.create_temporary_function("get_string_element",
> get_string_element)
>
> *# use the function in Python Table API*
>
> my_table.select("get_string_element(List_element)")
>
>
>
> Unfortunately, I cannot get it work. Does anybody have an idea how the
> correct way to extract the information is?
>
>
>
> Any comments or ideas are very welcome.
>
>
>
> Thanks
>
> Torben
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>


Re: Flink 1.12

2020-12-17 Thread Yang Wang
Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for
TaskManager pod. Before the KubernetesHAService is introduced, it works
fine because the TaskManager does not need to access the K8s resource(e.g.
ConfigMap) directly. I have created a ticket[1] to support setting service
account for TaskManager.
2. If you directly delete the JobManager deployment, then the HA related
ConfigMap will be retained. It is a by-design behavior. Because the job
does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
cluster could recover in the future. If all the jobs in the application
reach to the terminal state, all the HA related ConfigMaps will be cleaned
up automatically. You could cancel the job and verify that. Refer here[2]
for more information.

For the PVC based storage, if it could support multiple read-write then the
KubernetesHAService should work. Actually, it feels like a distributed
storage.

[1]. https://issues.apache.org/jira/browse/FLINK-20664
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html

Best,
Yang

Boris Lublinsky  于2020年12月18日周五 上午7:16写道:

> And K8 native HA works,
> But there are 2 bugs in this implementation.
>
> 1. Task manager pods are running as default user account, which fails
> because it does not have access to config maps to get endpoint’s
> information. I had to add permissions to default service account to make it
> work. Ideally both JM and TM pods should run under the same service
> account.
> 2. When a Flink application is deleted, it clears the main config map, but
> not the ones used for leader election
>
>
> And finally it works fine with PVC based storage, as long as it is
> read-write many
>
>
> On Dec 15, 2020, at 8:40 PM, Yang Wang  wrote:
>
> Hi Boris,
>
> What is -p 10?
>
> It is same to --parallelism 10. Set the default parallelism to 10.
>
> does it require a special container build?
>
> No, the official flink docker image could be used directly. Unfortunately,
> we do not have the image now. And we are trying to figure out.
> You could follow the instructions below to have your own image.
>
>
> git clone https://github.com/apache/flink-docker.git
> git checkout dev-master./add-custom.sh -u 
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
>  -n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t 
> flink:flink-1.12.0docker push flink:flink-1.12.0
>
>
> This is if I use HDFS for save pointing, right? I can instead use PVC -
>> based save pointing, correct?
>
> It is an example to storing the HA related data to OSS(Alibaba Cloud
> Object Storage, similar to S3). Since we require a distributed storage, I
> am afraid you could not use a PVC here. Instead, you could using a minio.
>
> Can I control the amount of standby JMs?
>
> Currently, you could not control the number of JobManagers. This is only
> because we have not introduce a config option for it. But you could do it
> manually via `kubectl edit deploy `. It should also work.
>
> Finally, what is behavior on the rolling restart of JM deployment?
>
> Once a JobManager terminated, it will lose the leadership and a standby
> one will take over. So on the rolling restart of JM deployment, you will
> find that the leader switches multiple times and your job also restarts
> multiple times. I am not sure why you need to roll the JobManager
> deployment. We are using deployment for JobManager in Flink just because we
> want the JobManager to be launched once it crashed. Another reason for
> multiple JobManagers is to get a faster recovery.
>
>
> Best,
> Yang
>
>
> Boris Lublinsky  于2020年12月16日周三 上午9:09写道:
>
>> Thanks Chesney for your quick response,
>> I read documentation
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>> 
>> More carefully and found the sample, I was looking for:
>>
>> ./bin/flink run-application -p 10 -t kubernetes-application
>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>> -Dkubernetes.container.image=flink:k8s-ha \
>> -Dkubernetes.container.image.pull-policy=Always \
>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
>> -Dtaskmanager.numberOfTaskSlots=4 \
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> \
>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10
>> \
>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>> \
>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>> \
>> 

取消订阅中文资讯邮件列表失败

2020-12-17 Thread 肖越
由失误操作使用了企业邮箱订阅,目前无法取消订阅,向user-zh-unsubscribe发送多封邮件也无效?请问官方有解决办法么?

Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 Thread bradyMk
Hi~谢谢 Yun Tang 大佬的解答~

不过这个指标不能单任务配置么?官网有这么个提示:

"启用本机指标可能会导致性能下降,应谨慎设置"[1]

所以如果全局配置,其他没有用RocksDB的任务也会尝试发送这个指标,那会不会导致其他任务的性能下降?感觉这样不是很科学啊?


[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 Thread 丁浩浩
问题我自己已经解决。

> 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:
> 
> flink版本:1.11.1
> udaf函数代码来自于阿里云官网文档
> 
> 以下是代码
> public class TestSql {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
>//env.setParallelism(3);
>tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);
> 
>Properties configs = CommonUtils.getConfigs();
>//注册clazz源表
>FlinkUtils.registerMysqlTable2FlinkTable(
>tableEnv,configs.getProperty("url"),
>configs.getProperty("user.name"), 
> configs.getProperty("password"),
>“test", "clazz_lesson");
> 
>Table table = tableEnv.sqlQuery("select 
> count_uadf(clazz_number),clazz_number from clazz_lesson group by 
> clazz_number");
>//Table table = tableEnv.sqlQuery("select 
> number,collect(extension_value) from clazz_extension group by number ");
>tableEnv.toRetractStream(table, Row.class).print();
>env.execute();
> 
> 
>}
> }
> 
> 
> 
> public class CountUdaf extends AggregateFunction {
>//定义存放count UDAF状态的accumulator的数据的结构。
>public static class CountAccum {
>public long total;
>}
> 
>@Override
>//初始化count UDAF的accumulator。
>public CountAccum createAccumulator() {
>CountAccum acc = new CountAccum();
>acc.total = 0;
>return acc;
>}
>@Override
>//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
>public Long getValue(CountAccum accumulator) {
>return accumulator.total;
>}
> 
> 
>//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
>public void accumulate(CountAccum accumulator, Long iValue) {
>accumulator.total++;
>}
>public void merge(CountAccum accumulator, Iterable its) {
>for (CountAccum other : its) {
>accumulator.total += other.total;
>}
>}
> }
> 
> 以下是堆栈信息
> 
> -
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 8 to line 1, column 31: No match 
> found for function signature count_uadf()
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>   at 
> com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 8 to line 1, column 31: No match found for function signature 
> count_uadf()
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>   at 
> 

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-17 Thread Xintong Song
I'm not aware of any significant changes to the HA components between
1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?

Thank you~

Xintong Song



On Fri, Dec 18, 2020 at 8:53 AM Lu Niu  wrote:

> Hi, Xintong
>
> Thanks for replying and your suggestion. I did check the ZK side but there
> is nothing interesting. The error message actually shows that only one TM
> thought JM lost leadership while others ran fine. Also, this happened only
> after we migrated from 1.9 to 1.11. Is it possible this is introduced by
> 1.11?
>
> Best
> Lu
>
> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song 
> wrote:
>
>> Hi Lu,
>>
>> I assume you are using ZooKeeper as the HA service?
>>
>> A common cause of unexpected leadership lost is the instability of HA
>> service. E.g., if ZK does not receive heartbeat from Flink RM for a
>> certain period of time, it will revoke the leadership and notify
>> other components. You can look into the ZooKeeper logs checking why RM's
>> leadership is revoked.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:
>>
>>> Hi, Flink users
>>>
>>> Recently we migrated to flink 1.11 and see exceptions like:
>>> ```
>>> 2020-12-15 12:41:01,199 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
>>> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
>>> java.lang.Exception: Job leader for job id
>>> 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [nrtg-1.11_deploy.jar:?]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>>> [nrtg-1.11_deploy.jar:?]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
>>> ```
>>>
>>> ```
>>> 2020-12-15 01:01:39,531 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>>> USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner ->
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360)
>>> (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on
>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
>>> org.apache.flink.util.FlinkException: ResourceManager leader changed to
>>> new address null
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>> ~[nrtg-1.11_deploy.jar:?]
>>> at
>>> 

Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 Thread Xintong Song
https://issues.apache.org/jira/browse/FLINK-20646

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 11:40 PM zhisheng  wrote:

> hi,xintong
>
> 有对应的 Issue ID 吗?
>
> Xintong Song  于2020年12月17日周四 下午4:48写道:
>
> > 确实是 1.12.0 的 bug。
> > 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> > ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> > 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com>
> wrote:
> >
> > > 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> > > true))之后会报错
> > > :
> > > Caused by: java.lang.IllegalArgumentException: The fraction of memory
> to
> > > allocate should not be 0. Please make sure that all types of managed
> > memory
> > > consumers contained in the job are configured with a non-negative
> weight
> > > via
> > > `taskmanager.memory.managed.consumer-weights`.
> > >
> > > 但查看源码这个参数是默认值。
> > > 最终找到原因是
> > > Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> > > config缺少key : managedMemFraction.STATE_BACKEND
> > > 当设置
> > > config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> > > 后,程序正常。
> > > 代码如下
> > > https://paste.ubuntu.com/p/9WrBz3Xrc6/
> > >
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


Re: task manager内存使用问题

2020-12-17 Thread Yangze Guo
1. 加jvm参数可以使用env.java.opts.taskmanager配置
2. 目前tm中没有对heap memory进行slot间细粒度管理,session模式下不支持这种功能

Best,
Yangze Guo

On Fri, Dec 18, 2020 at 9:22 AM guoliubi...@foxmail.com
 wrote:
>
> Hi,
> 现在使用的是flink1.12,使用standalone cluster模式运行。
> 在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
> 想问下怎么给task manager的jvm加上heap dump相关参数。
> 还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
> manager还有其他job在跑,会导致其他job一起fail。
>
>
>
> guoliubi...@foxmail.com


回复: task manager内存使用问题

2020-12-17 Thread yinghua...@163.com
我也是在这个讨论群学到的,你看下能否解答你的问题
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#env-java-opts

这个里面可以配置task manager的虚拟机参数,在虚拟机参数里配置 -XX:OnOutOfMemoryError=kill -9 
%p,这样当OOM时杀掉task manager进程







yinghua...@163.com
 
发件人: guoliubi...@foxmail.com
发送时间: 2020-12-18 09:22
收件人: user-zh
主题: task manager内存使用问题
Hi,
现在使用的是flink1.12,使用standalone cluster模式运行。
在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
想问下怎么给task manager的jvm加上heap dump相关参数。
还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
manager还有其他job在跑,会导致其他job一起fail。
 
 
 
guoliubi...@foxmail.com


task manager内存使用问题

2020-12-17 Thread guoliubi...@foxmail.com
Hi,
现在使用的是flink1.12,使用standalone cluster模式运行。
在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
想问下怎么给task manager的jvm加上heap dump相关参数。
还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
manager还有其他job在跑,会导致其他job一起fail。



guoliubi...@foxmail.com


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-17 Thread Lu Niu
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there
is nothing interesting. The error message actually shows that only one TM
thought JM lost leadership while others ran fine. Also, this happened only
after we migrated from 1.9 to 1.11. Is it possible this is introduced by
1.11?

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song  wrote:

> Hi Lu,
>
> I assume you are using ZooKeeper as the HA service?
>
> A common cause of unexpected leadership lost is the instability of HA
> service. E.g., if ZK does not receive heartbeat from Flink RM for a
> certain period of time, it will revoke the leadership and notify
> other components. You can look into the ZooKeeper logs checking why RM's
> leadership is revoked.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu  wrote:
>
>> Hi, Flink users
>>
>> Recently we migrated to flink 1.11 and see exceptions like:
>> ```
>> 2020-12-15 12:41:01,199 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
>> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
>> java.lang.Exception: Job leader for job id
>> 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
>> ~[nrtg-1.11_deploy.jar:?]
>> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[nrtg-1.11_deploy.jar:?]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> [nrtg-1.11_deploy.jar:?]
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> [nrtg-1.11_deploy.jar:?]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [nrtg-1.11_deploy.jar:?]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [nrtg-1.11_deploy.jar:?]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
>> ```
>>
>> ```
>> 2020-12-15 01:01:39,531 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>> USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner ->
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360)
>> (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on
>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
>> org.apache.flink.util.FlinkException: ResourceManager leader changed to
>> new address null
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[nrtg-1.11_deploy.jar:?]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[nrtg-1.11_deploy.jar:?]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> [nrtg-1.11_deploy.jar:?]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> [nrtg-1.11_deploy.jar:?]
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> 

回复:flink1.12 docker 镜像啥时候有

2020-12-17 Thread superainbower
hi,我重新git下来,build又可以了,可能之前我下的有文件有问题

在2020年12月17日 14:08,Yang Wang 写道:
你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的

你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的

  - git clone https://github.com/apache/flink-docker.git
  - cd scala_2.11-java8-debian
  - sudo docker build -t flink:1.12.0 .
  - docker push


Best,
Yang

superainbower  于2020年12月17日周四 上午7:19写道:

> 请教下 git checkout dev-master./add-custom.sh -u
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> -n flink-1.12.0 这是一条指令吗?感觉执行不了
> 另外直接
> git clone https://github.com/apache/flink-docker.git
> 在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0 .可以吗,我尝试直接这样构建出来的镜像好像不能跑
>
> 在2020年12月16日 10:56,Yang Wang 写道:
> 目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
> 具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249
>
> 当前你也可以自己build一个镜像来使用,方法如下:
>
> git clone https://github.com/apache/flink-docker.git
> git checkout dev-master./add-custom.sh -u
>
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> -n
> 
> flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
> flink:flink-1.12.0docker push flink:flink-1.12.0
>
>
>
> jiangjiguang719  于2020年12月9日周三 下午5:09写道:
>
> > 请问啥时候 在docker hub中可以看到1.12版本的镜像?
>


Re: Flink 1.12

2020-12-17 Thread Boris Lublinsky
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because 
it does not have access to config maps to get endpoint’s information. I had to 
add permissions to default service account to make it work. Ideally both JM and 
TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not 
the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write 
many


> On Dec 15, 2020, at 8:40 PM, Yang Wang  wrote:
> 
> Hi Boris,
> 
> What is -p 10?
> It is same to --parallelism 10. Set the default parallelism to 10.
> 
> does it require a special container build?
> No, the official flink docker image could be used directly. Unfortunately, we 
> do not have the image now. And we are trying to figure out.
> You could follow the instructions below to have your own image.
> 
> 
> git clone https://github.com/apache/flink-docker.git 
> 
> 
> git checkout dev-master
> 
> ./add-custom.sh -u 
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
>  
> 
>  -n flink-1.12.0
> 
> cd dev/flink-1.12.0-debian
> docker build . -t flink:flink-1.12.0
> docker push flink:flink-1.12.0
> 
> This is if I use HDFS for save pointing, right? I can instead use PVC - based 
> save pointing, correct?
> It is an example to storing the HA related data to OSS(Alibaba Cloud Object 
> Storage, similar to S3). Since we require a distributed storage, I am afraid 
> you could not use a PVC here. Instead, you could using a minio.
> 
> Can I control the amount of standby JMs? 
> Currently, you could not control the number of JobManagers. This is only 
> because we have not introduce a config option for it. But you could do it 
> manually via `kubectl edit deploy `. It should also work.
> 
> Finally, what is behavior on the rolling restart of JM deployment?
> Once a JobManager terminated, it will lose the leadership and a standby one 
> will take over. So on the rolling restart of JM deployment, you will find 
> that the leader switches multiple times and your job also restarts multiple 
> times. I am not sure why you need to roll the JobManager deployment. We are 
> using deployment for JobManager in Flink just because we want the JobManager 
> to be launched once it crashed. Another reason for multiple JobManagers is to 
> get a faster recovery.
> 
> 
> Best,
> Yang
>  
> 
> Boris Lublinsky  > 于2020年12月16日周三 上午9:09写道:
> Thanks Chesney for your quick response,
> I read documentation 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>  
> 
> More carefully and found the sample, I was looking for:
> 
> ./bin/flink run-application -p 10 -t kubernetes-application 
> -Dkubernetes.cluster-id=k8s-ha-app1 \
> -Dkubernetes.container.image=flink:k8s-ha \ 
> -Dkubernetes.container.image.pull-policy=Always \
> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>  \
> -Dhigh-availability.storageDir=oss://flink/flink-ha \
> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>  \
> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>  \
> local:///opt/flink/examples/streaming/StateMachineExample.jar <>
> 
> A couple of questions about it:
> 
> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application 
> -t. What is -p 10?
> -Dkubernetes.container.image=flink:k8s-ha does it require a special container 
> build?
> 
> -Dhigh-availability.storageDir=oss://flink/flink-ha \
> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>  \
> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>  \
> 
> This is if I use HDFS for save pointing, right? I can instead use PVC - based 
> save pointing, correct?
> 
> Also I was trying to understand, how it works, and from the documentation it 
> sounds like there is one active and one or 
> more standby JMs. Can I control the amount of standby JMs?
> 
> Finally, what is behavior on the rolling restart of JM deployment?
> 
> 
> 
> 
>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler > > wrote:
>> 
>> Unfortunately no; there are some discussions going on in the 

Pyflink UDF with ARRAY as input

2020-12-17 Thread Barth, Torben
Dear List,

I have  a table with the following structure

my_table
-- Key: String
-- List_element: ARRAY>

I want to define a udf to extract information of the “list_element”. I do not 
manage to access the information of the array in the udf. I try something like:

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
   my_string = ‘xxx’
for element in my_list:
if element.integer_element  == 2:
my_string = element. string_element
return my_string


table_env.create_temporary_function("get_string_element", get_string_element)
# use the function in Python Table API
my_table.select("get_string_element(List_element)")

Unfortunately, I cannot get it work. Does anybody have an idea how the correct 
way to extract the information is?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 Thread zhisheng
hi,xintong

有对应的 Issue ID 吗?

Xintong Song  于2020年12月17日周四 下午4:48写道:

> 确实是 1.12.0 的 bug。
> 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:
>
> > 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> > true))之后会报错
> > :
> > Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> > allocate should not be 0. Please make sure that all types of managed
> memory
> > consumers contained in the job are configured with a non-negative weight
> > via
> > `taskmanager.memory.managed.consumer-weights`.
> >
> > 但查看源码这个参数是默认值。
> > 最终找到原因是
> > Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> > config缺少key : managedMemFraction.STATE_BACKEND
> > 当设置
> > config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> > 后,程序正常。
> > 代码如下
> > https://paste.ubuntu.com/p/9WrBz3Xrc6/
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


flink sql read hive table throw java.lang.ArrayIndexOutOfBoundsException: 1024

2020-12-17 Thread house??????
when i use pyflink hive sql read data insert into es ,throw the follow 
exeception : the environment ?? flink 1.11.2 
flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar hive 3.1.2


2020-12-17 21:10:24,398 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: HiveTableSource(driver_id, driver_base_lc_p1, 
driver_90d_lc_p1, driver_30d_lc_p1, driver_14d_lc_p1, driver_180d_lc_p1, 
vehicle_base_lc_p1, driver_active_zone, is_incremental, dt) TablePath: 
algorithm.jiawei_oas_driver_features_for_incremental_hive2kafka, 
PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 8, 9] - 
Calc(select=[driver_id, is_incremental, dt, () AS bdi_feature_create_time]) 
- Sink: 
Sink(table=[default_catalog.default_database.0_demo4_903157246_tmp], 
fields=[driver_id, is_incremental, dt, bdi_feature_create_time]) (1/1) 
(98f4259c3d00fac9fc3482a4cdc8df3c) switched from RUNNING to FAILED. at 
org.apache.orc.impl.ConvertTreeReaderFactory$AnyIntegerTreeReader.nextVector(ConvertTreeReaderFactory.java:445)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1300) 
~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader.reachedEnd(HiveVectorizedOrcSplitReader.java:99)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2] 2020-12-17 21:10:24,402 INFO  
org.apache.flink.runtime.taskmanager.Task[] - Freeing task 
resources for Source: HiveTableSource(driver_id, driver_base_lc_p1, 
driver_90d_lc_p1, driver_30d_lc_p1, driver_14d_lc_p1, driver_180d_lc_p1, 
vehicle_base_lc_p1, driver_active_zone, is_incremental, dt) TablePath: 
algorithm.jiawei_oas_driver_features_for_incremental_hive2kafka, 
PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 8, 9] - 
Calc(select=[driver_id, is_incremental, dt, () AS bdi_feature_create_time]) 
- Sink: 
Sink(table=[default_catalog.default_database.0_demo4_903157246_tmp], 
fields=[driver_id, is_incremental, dt, bdi_feature_create_time]) (1/1) 
(98f4259c3d00fac9fc3482a4cdc8df3c). java.lang.ArrayIndexOutOfBoundsException: 
1024 at 
org.apache.flink.orc.shim.OrcShimV210.nextBatch(OrcShimV210.java:35) 
~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.orc.shim.OrcShimV210.nextBatch(OrcShimV210.java:29) 
~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.orc.OrcSplitReader.ensureBatch(OrcSplitReader.java:134) 
~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.orc.impl.TreeReaderFactory$LongTreeReader.nextVector(TreeReaderFactory.java:612)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.orc.impl.TreeReaderFactory$TreeReader.nextVector(TreeReaderFactory.java:269)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.orc.impl.ConvertTreeReaderFactory$StringGroupFromAnyIntegerTreeReader.nextVector(ConvertTreeReaderFactory.java:1477)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:2012)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.orc.OrcSplitReader.reachedEnd(OrcSplitReader.java:101) 
~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.reachedEnd(HiveTableInputFormat.java:261)
 ~[flink-sql-connector-hive-3.1.2_2.11-1.11.2.jar:1.11.2] at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2] 2020-12-17 21:10:24,406 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor so ,how to solve this 
question?

Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 Thread Yun Tang
Hi

这些metrics启用的配置是放到flink conf里面的,不是让你直接在代码里面调用的。

祝好
唐云

From: bradyMk 
Sent: Thursday, December 17, 2020 20:56
To: user-zh@flink.apache.org 
Subject: Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

谢谢 Yun Tang 大佬的解答~

另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控,代码如下:
//设置RocksDB状态后端,且开启增量ck
val backend = new RocksDBStateBackend(path, true)

//监控配置项
val metricOptions = new RocksDBNativeMetricOptions
metricOptions.enableSizeAllMemTables()

//设置预选项
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)

//开启RocksDB
env.setStateBackend(backend.asInstanceOf[StateBackend])

但是发现这个监控指标并没有成功发送,请问是我在代码里开启的方式不对么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 Thread 丁浩浩
flink版本:1.11.1
udaf函数代码来自于阿里云官网文档

以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
//env.setParallelism(3);
tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//注册clazz源表
FlinkUtils.registerMysqlTable2FlinkTable(
tableEnv,configs.getProperty("url"),
configs.getProperty("user.name"), 
configs.getProperty("password"),
“test", "clazz_lesson");

Table table = tableEnv.sqlQuery("select 
count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select 
number,collect(extension_value) from clazz_extension group by number ");
tableEnv.toRetractStream(table, Row.class).print();
env.execute();


}
}



public class CountUdaf extends AggregateFunction {
//定义存放count UDAF状态的accumulator的数据的结构。
public static class CountAccum {
public long total;
}

@Override
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
@Override
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}


//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
public void accumulate(CountAccum accumulator, Long iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}

以下是堆栈信息

-
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 8 to line 1, column 31: No match found 
for function signature count_uadf()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at 
com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 31: No match found for function signature 
count_uadf()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at 

Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 Thread bradyMk
谢谢 Yun Tang 大佬的解答~

另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控,代码如下:
//设置RocksDB状态后端,且开启增量ck
val backend = new RocksDBStateBackend(path, true)

//监控配置项
val metricOptions = new RocksDBNativeMetricOptions
metricOptions.enableSizeAllMemTables()

//设置预选项
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)

//开启RocksDB
env.setStateBackend(backend.asInstanceOf[StateBackend])

但是发现这个监控指标并没有成功发送,请问是我在代码里开启的方式不对么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-17 Thread Till Rohrmann
Flink should try to pick the latest checkpoint and will only use the
savepoint if no newer checkpoint could be found.

Cheers,
Till

On Wed, Dec 16, 2020 at 10:13 PM vishalovercome  wrote:

> I'm not sure if this addresses the original concern. For instance consider
> this sequence:
>
> 1. Job starts from savepoint
> 2. Job creates a few checkpoints
> 3. Job manager (just one in kubernetes) crashes and restarts with the
> commands specified in the kubernetes manifest which has the savepoint path
>
> Will Zookeeper based HA ensure that this savepoint path will be ignored?
>
> I've asked this and various other questions here -
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Will-job-manager-restarts-lead-to-repeated-savepoint-restoration-tp40188.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Does flink have a plan to support flink sql udf of any language?

2020-12-17 Thread Till Rohrmann
Hi Josh,

Currently Flink supports Java, Scala and Python for defining SQL UDFs.
There are no concrete plans to extend the set of supported languages at the
moment.

In general, these kinds of contributions are always welcome. What we have
to make sure is to see how it fits into the overall story. In order to
assess this, I would recommend writing to d...@flink.apache.org to propose
your idea and the solution proposal.

Cheers,
Till

On Thu, Dec 17, 2020 at 11:53 AM Joshua Fan  wrote:

> Hi,
>
> Does the flink community have a plan to support flink sql udf in any
> language? For example, a udf in c or php. Because in my company, many
> developers do not know java or scala, they use c in their usual work.
>
> Now we have a workaround to support this situation by creating a process
> running the c logic, and the process communicates with flink sql by a
> wrapper udf function.
>
> So, here I have two questions:
> 1.  Does the flink community have a plan to support flink sql udf in any
> language?
> 2. If the answer to question 1 is no, is there any possibility to push our
> solution to flink? Is this udf process staff  in line with the ideas of the
> community aboud flink sql?
>
> Thanks
> Yours Sincerely
> Josh
>
>


Re: Re: flink clickhouse connector

2020-12-17 Thread magichuang
您是用java写的还是pyflink  啊?  我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包  
但是一直在下载依赖好多。。


> -- 原始邮件 --
> 发 件 人:"guoliubi...@foxmail.com" 
> 发送时间:2020-12-17 19:36:55
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink clickhouse connector
>
> 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。
>
>
>
> guoliubi...@foxmail.com
>
> From: magichuang
> Date: 2020-12-17 18:41
> To: user-zh
> Subject: flink clickhouse connector
> hi 想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>  这个flink-connector,但是运行报错了:
>
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
>
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
>
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
>
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
>
> ... 12 more
>
>
>
>
> 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11
>
>
>
>
> 有小伙伴成功对接的嘛,可否分享一下connector呀
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
>



--

Best,

MagicHuang




flink-1.11.1 checkpoint超时,任务会不断重启

2020-12-17 Thread nicygan
dear all:
我有一个flink流式任务,checkpoint周期5分钟,超时时间3分钟。
此任务中调用了第三方接口,正常情况下没问题,正常的checkpoint时长仅80ms。


 但由于第三方接口发生了拥堵,有部分调用会超时(接口调用超时设置了5秒钟),
 然后此算子的checkpoint就会超时,
 checkpoint 3179 of job  expired before completing

 trying to recover from a global failure
 exceeded checkpoint tolerable failure threshold

 然后任务就发生重启,恢复到最后一个正常checkpoint点。
 但到下一个checkpoint周期时,又超时,又发生重启,又恢复到那个正常checkpoint点。
 就如此反复重启,恢复到那个正常checkpoint点,也导致流中的数据无法继续消费。


 checkpoint超时为什么会导致任务重启,可以避免让他重启吗?
 调用第三方接口超时的数据,我可以后面单独处理,但重启却导致了数据无法消费。


thanks
/nicygan


Re: flink clickhouse connector

2020-12-17 Thread liang zhao
我使用的是第三方的驱动,clickhouse-native-jdbc,通过JDBC的方式。
> 2020年12月17日 18:41,magichuang  写道:
> 
> hi想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>   这个flink-connector,但是运行报错了:
> 
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
> 
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
> 
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
> 
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> 
> at java.lang.Thread.run(Thread.java:748)
> 
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
> 
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
> 
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
> 
> ... 12 more
> 
> 
> 
> 
> 但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11
> 
> 
> 
> 
> 有小伙伴成功对接的嘛,可否分享一下connector呀
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Best,
> 
> MagicHuang
> 
> 
> 
> 
> 



Re: flink clickhouse connector

2020-12-17 Thread guoliubi...@foxmail.com
我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。



guoliubi...@foxmail.com
 
From: magichuang
Date: 2020-12-17 18:41
To: user-zh
Subject: flink clickhouse connector
hi想问一下有小伙伴使用flink 
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
  这个flink-connector,但是运行报错了:
 
Caused by: java.io.IOException: unable to establish connection to ClickHouse
 
at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
 
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
 
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 
at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 
at java.lang.Thread.run(Thread.java:748)
 
Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed 
table
 
at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
 
at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
 
... 12 more
 
 
 
 
但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11
 
 
 
 
有小伙伴成功对接的嘛,可否分享一下connector呀
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
Best,
 
MagicHuang
 
 
 
 
 


Flink1.12.0/flink1.11.0版本出现FLINK-19677的问题

2020-12-17 Thread 李延延
你好,我在测试1.12版本时,在虚拟机上部署了一个jobmanager,三个taskmanager;
环境信息:
1.centos7/虚拟机,双网卡(一个.4网段,一个.18网段)
2.jobmanager 1节点;taskmanager3节点,未开启高可用


配置中发现hostname 
对应的是其中的一个网卡的(18网段)ip;而flink的masters/workers文件全部配置的是.4网段的ip,最重要的是jobmanager.rpc.address也配置的是.4网段,启动集群之后提交WordCount示例,提交不成功,报错与FLINK-19677一样
在节点上重新部署flink1.10.0版本,使用相同的ip配置,可以正常提交任务,并运行成功


对于flink1.12.0版本报错最后的解决办法是,换成把ip换成18网段,或者改成hostname问题都能解决,所以这个问题应该是flink1.11.0开始引入的问题





Does flink have a plan to support flink sql udf of any language?

2020-12-17 Thread Joshua Fan
Hi,

Does the flink community have a plan to support flink sql udf in any
language? For example, a udf in c or php. Because in my company, many
developers do not know java or scala, they use c in their usual work.

Now we have a workaround to support this situation by creating a process
running the c logic, and the process communicates with flink sql by a
wrapper udf function.

So, here I have two questions:
1.  Does the flink community have a plan to support flink sql udf in any
language?
2. If the answer to question 1 is no, is there any possibility to push our
solution to flink? Is this udf process staff  in line with the ideas of the
community aboud flink sql?

Thanks
Yours Sincerely
Josh


flink clickhouse connector

2020-12-17 Thread magichuang
hi想问一下有小伙伴使用flink 
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
  这个flink-connector,但是运行报错了:

Caused by: java.io.IOException: unable to establish connection to ClickHouse

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)

at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)

at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed 
table

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)

... 12 more




但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11




有小伙伴成功对接的嘛,可否分享一下connector呀



















Best,

MagicHuang







回复: flink1.12 docker 镜像啥时候有

2020-12-17 Thread superainbower
您好,镜像打完之后,向K8S提交jobmanager-job.yaml的时候,jobmanager起不来,看日志,日志里报镜像时的docker-entrypoint.sh脚本第102行缺少
 ),我比对了下1.12 和1.11镜像里的 docker-entrypoint.sh,1.12里102行( 
_args=("${_args[@]:1}")对应的是 一个新加的 方法

disable_jemalloc_env() {

  # use nameref '_args' to update the passed 'args' within function

   local -n _args=$1

   if [ "${_args[0]}" = ${COMMAND_DISABLE_JEMALLOC} ]; then

  echo "Disable Jemalloc as the memory allocator"

  _args=("${_args[@]:1}")

   else

   echo "Enable Jemalloc as the memory allocator via appending env variable 
LD_PRELOAD with /usr/lib/x86_64-linux-gnu/libjemalloc.so"

   export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so

   fi

}



另外我看了1.12的文档,yaml文件和1.11应该没有区别,原先的yaml文件在1.11.1的版本是可以正常执行的
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年12月17日 14:08,Yang Wang 写道:
你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的

你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的

- git clone https://github.com/apache/flink-docker.git
- cd scala_2.11-java8-debian
- sudo docker build -t flink:1.12.0 .
- docker push


Best,
Yang

superainbower  于2020年12月17日周四 上午7:19写道:

请教下 git checkout dev-master./add-custom.sh -u
https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n flink-1.12.0 这是一条指令吗?感觉执行不了
另外直接
git clone https://github.com/apache/flink-docker.git
在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0 .可以吗,我尝试直接这样构建出来的镜像好像不能跑

在2020年12月16日 10:56,Yang Wang 写道:
目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249

当前你也可以自己build一个镜像来使用,方法如下:

git clone https://github.com/apache/flink-docker.git
git checkout dev-master./add-custom.sh -u

https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n

flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
flink:flink-1.12.0docker push flink:flink-1.12.0



jiangjiguang719  于2020年12月9日周三 下午5:09写道:

请问啥时候 在docker hub中可以看到1.12版本的镜像?



Re: Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-17 Thread Timo Walther

Hi Dan,

are you intending to use interval joins, regular joins, or a mixture of 
both?


For regular joins you must ensure to cast a rowtime attribute to 
timestamp as early as possible. For interval joins, you need to make 
sure that the rowtime attribute is unmodified.


Currently, I see

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS 
TIMESTAMP(3))) AS insertion_ts


or

CAST(flat_impression_view.impression_ts AS TIMESTAMP)

which disables interval joins implicitly.

If you would like to keep the interval join properties, you need to do 
the casting in a computed column during a CREATE TABLE statement. Before 
declaring a watermark for it.


Regards,
Timo



On 15.12.20 18:47, Dan Hill wrote:
When I try to refactor my joins into a temporary view to share joins and 
state, I get the following error.  I tried a few variations of the code 
snippets below (adding TIMESTAMP casts based on Google searches).  I 
removed a bunch of fields to simplify this example.


Is this a known issue?  Do I have a simple coding bug?

CREATE TEMPORARY VIEW `flat_impression_view` AS

SELECT

DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt,

input_insertion.log_user_id AS insertion_log_user_id,

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS 
TIMESTAMP(3))) AS insertion_ts,


input_insertion.insertion_id AS insertion_insertion_id,

COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS 
TIMESTAMP(3))) AS impression_ts,


input_impression.impression_id AS impression_impression_id,

input_impression.insertion_id AS impression_insertion_id,

FROM input_insertion

JOIN input_impression

ON input_insertion.insertion_id = input_impression.insertion_id

AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN 
CAST(input_impression.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND 
CAST(input_impression.ts AS TIMESTAMP) + INTERVAL '1' HOUR



INSERT INTO `flat_impression_w_click`

SELECT

dt,

insertion_log_user_id,

CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,

insertion_insertion_id,

CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,

impression_impression_id,

impression_insertion_id,

COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) 
AS click_ts,


COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,

COALESCE(input_click.impression_id, EmptyByteArray()) AS 
click_impression_id,


FROM flat_impression_view

LEFT JOIN input_click

ON flat_impression_view.impression_impression_id = input_click.impression_id

AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN 
CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND 
CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR



java.lang.RuntimeException: Failed to executeSql=...

...

Caused by: org.apache.flink.table.api.TableException: Cannot generate a 
valid execution plan for the given query:


FlinkLogicalLegacySink(name=[...])

+- FlinkLogicalCalc(select=[...])

+- FlinkLogicalJoin(condition=[AND(=($36, $45), 
 >=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6), 
4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL, 
+(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left])


:- FlinkLogicalCalc(select=[...])

:+- FlinkLogicalJoin(condition=[AND(=($5, $35), 
 >=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL 
HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6), 
360:INTERVAL HOUR)))], joinType=[inner])


: :- FlinkLogicalDataStreamTableScan(table=[[default, mydb, 
input_insertion]])


: +- FlinkLogicalDataStreamTableScan(table=[[default, mydb, 
input_impression]])


+- FlinkLogicalDataStreamTableScan(table=[[default, mydb, input_click]])


Rowtime attributes must not be in the input rows of a regular join. As a 
workaround you can cast the time attributes of input tables to TIMESTAMP 
before.


Please check the documentation for the set of currently supported SQL 
features.


at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)


at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)


at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)


at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)


at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)


at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)


at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at 

Re: Flink - sending clicks+impressions to AWS Personalize

2020-12-17 Thread Timo Walther

Hi Dan,

the exception that you get is a very frequent limitation in Flink SQL at 
the moment.


I tried to summarize the issue recently here:

https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296

The query is quite complex. It seems that some JOIN is not recognized as 
a streaming interval join. Maybe you can split up the big query into 
individual subqueries and verify the the plan using 
`TableEnvironment.explainSql()` to figure out which join causes the 
exception.


Regards,
Timo


On 16.12.20 03:40, Dan Hill wrote:
I want to try using AWS Personalize 
 to get content recommendations.  
One of the fields on the input (click) event is a list of recent 
impressions.


E.g.
{
   ...
   eventType: 'click',
   eventId: 'click-1',
   itemId: 'item-1'
   impression: ['item-2', 'item-3', 'item-4', 'item-5', ],
}

Is there a way to produce this output using Flink SQK?

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As 
a workaround you can cast the time attributes of input tables to 
TIMESTAMP before."


Here is a simplified version of the query.


SELECT

   "user".user_id AS userId,

"view".session_id AS sessionId,  click.click_id AS eventId,

   CAST(click.ts AS BIGINT) AS sentAt,

   insertion.content_id AS itemId,

   impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

   ON "user".log_user_id = "view".log_user_id

   AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts + 
INTERVAL '1' HOUR


JOIN insertion

   ON view.view_id = insertion.view_id

   AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND 
insertion.ts + INTERVAL '1' HOUR


JOIN impression  ON insertion.insertion_id = impression.insertion_id

   AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND 
impression.ts + INTERVAL '1' HOUR


JOIN (

   SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS 
ARRAY) AS impression_content_ids


FROM (

   SELECT insertion.log_user_id AS log_user_id,

   ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY 
impression.ts DESC) AS row_num,


       insertion.content_id AS impression_content_id

     FROM insertion

     JOIN impression

     ON insertion.insertion_id = impression.insertion_id

       AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND 
impression.ts + INTERVAL '1' HOUR


     GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

) WHERE row_num <= 25

GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND 
impression.ts + INTERVAL '1' HOUR  LEFT JOIN click


ON impression.impression_id = click.impression_id

     AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND 
click.ts + INTERVAL '12' HOUR"






Re: Set TimeZone of Flink Streaming job

2020-12-17 Thread Timo Walther

Hi,

Flink does not support time zones currently. However, all time 
operations work on Java `long` values. It can be up to the user what 
this long value represents. It must not be UTC but can also be adjusted 
for another time zone. Since DataStream API supports arbirary Java 
objects, users can convert the long value to LocalDataTime or even 
OffsetDateTime if necessary. Windows are always align to the beginning 
where the long value equals 0.


Regards,
Timo


On 16.12.20 15:48, narasimha wrote:

Hi,

How to configure flink job to follow a certain TimeZone, instead of 
default/UTC.

Is it possible in the first place?

Solutions present are for Table/SQL API.

--
A.Narasimha Swamy




Re: Changing application configuration when restoring from checkpoint/savepoint

2020-12-17 Thread Timo Walther

Hi,

I gave some answers in the other mail thread. Some additional comment: 
In general I think even configuration can be considered as state in this 
case. If state is not set, the job can be considered as a fresh start. 
Once the state is set, it would basically be just a configuration 
update. You could set timers to periodically query either an external 
service or just a distributed file system for an updated version of the 
configuration.


If you want to avoid excessive state access, you could also cache the 
state's content in a transient variable.


Regards,
Timo


On 16.12.20 21:50, vishalovercome wrote:

Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as
string). Then in the open method, lookup the relevant configuration using
getTaskName().

A follow up to this would be configuring custom windowing functions. I have
a size as well as a time based window class where size and time limits are
configurable and passed as constructor arguments. How to change this
configuration when state persistence/recovery is enabled? A window doesn't
have an open method per se





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: state inside functions

2020-12-17 Thread Timo Walther

Hi,

if you would like to dynamically adjust configuration of your streaming 
job, it might be a good approach to consider the configuration as a 
stream itself.


The connect() API can be used to connect a main stream with a control 
stream. In any case the configuration should be persisted in state if it 
should be present after restore. Otherwise, you need to implement a 
logic where the operator must query the latest configuration from some 
external system which could become the bottleneck.


Regards,
Timo


On 16.12.20 22:07, vishalovercome wrote:

When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a
later time doesn't work as it uses state to restore older configuration. How
can we pass configuration while having the flexibility to change the values
at a later date?

I've started another discussion with many more questions -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





flink-sql????-??????????state

2020-12-17 Thread ??????
?? flink 
sql??cdccdc??state??
 state??
val config: TableConfig = tabEnv.getConfig
config.setIdleStateRetention(Duration.ofHours(1))

pyflink 有没有方便的print方式?例如java api中的 .print() ?

2020-12-17 Thread huang huang
print(page_turn.to_pandas())

> 可以collect到client端[1],或者可以看看另外几种方式[2]: [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.table.html#pyflink.table.TableResult.collect
>   [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/intro_to_table_api.html#emit-results
>在 2020年12月17日,下午2:47,肖越  写道:   最近在尝试 pyflink 
> 功能,只查到了定义connector 的输出方法,例如:  sink_ddl = '''  CREATE TABLE print_sink 
> (  ID DOUBLE,  NAME STRING  ) WITH (  'connector' = 'print' 
>  )  '''  每次都要事先定义好要输出的表格格式,是否有更加方便的输出方法?


曹 三启 added you to the flink group

2020-12-17 Thread 曹 三启
[Microsoft] Groups
[cid:TransparentConsumerWelcomeMailNonOutlookTopBanner]
曹 三启 added
you to flink!
Use the group to share messages and files, and to coordinate group events.
3 members
Have group discussions
With only one email address to remember, connecting with everyone is easy.
Email the group
[ConversationIcon]
[FilesIcon]
Share your files
Send documents, photos, and links. Everyone wants to see those selfies from 
your trip!
Share 
files
Plan group events
Inviting the group is a snap with one email address. You’ll never accidentally 
leave someone off an invitation again.
Create an 
event
[CalendarIcon]
Start exploring group features today
Learn more
[cid:TransparentConsumerWelcomeMailNonOutlookFooter]
You're receiving this message because you're a member of the flink group from 
Microsoft 365.
Report 
abuse
  |  Leave 
group
  |  Privacy Statement  |  
Learn more
© Microsoft Corporation, One Microsoft Way, Redmond, WA 98052 USA
[Microsoft]


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-17 Thread Till Rohrmann
Hi,

if you start a Flink job from a savepoint and the job needs to recover,
then it will only reuse the savepoint if no later checkpoint has been
created. Flink will always use the latest checkpoint/savepoint taken.

Cheers,
Till

On Wed, Dec 16, 2020 at 9:47 PM vishalovercome  wrote:

> My flink job runs in kubernetes. This is the setup:
>
> 1. One job running as a job cluster with one job manager
> 2. HA powered by zookeeper (works fine)
> 3. Job/Deployment manifests stored in Github and deployed to kubernetes by
> Argo
> 4. State persisted to S3
>
> If I were to stop (drain and take a savepoint) and resume, I'll have to
> update the job manager manifest with the savepoint location and save it in
> Github and redeploy. After deployment, I'll presumably have to modify the
> manifest again to remove the savepoint location so as to avoid starting the
> application from the same savepoint. This raises some questions:
>
> 1. If the job manager were to crash before the manifest is updated again
> then won't kubernetes restart the job manager from the savepoint rather
> than
> the latest checkpoint?
> 2. Is there a way to ensure that restoration from a savepoint doesn't
> happen
> more than once? Or not after first successful checkpoint?
> 3. If even one checkpoint has been finalized, then the job should prefer
> the
> checkpoint rather than the savepoint. Will that happen automatically given
> zookeeper?
> 4. Is it possible to not have to remove the savepoint path from the
> kubernetes manifest and simply rely on newer checkpoints/savepoints? It
> feels rather clumsy to have to add and remove back manually. We could use a
> cron job to remove it but its still clumsy.
> 5. Is there a way of asking flink to use the latest savepoint rather than
> specifying the location of the savepoint? If I were to manually rename the
> s3 savepoint location to something fixed (s3://fixed_savepoint_path_always)
> then would there be any problem restoring the job?
> 6. Any open source tool that solves this problem?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 Thread Xintong Song
确实是 1.12.0 的 bug。
我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:

> 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> true))之后会报错
> :
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> allocate should not be 0. Please make sure that all types of managed memory
> consumers contained in the job are configured with a non-negative weight
> via
> `taskmanager.memory.managed.consumer-weights`.
>
> 但查看源码这个参数是默认值。
> 最终找到原因是
> Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> config缺少key : managedMemFraction.STATE_BACKEND
> 当设置
> config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> 后,程序正常。
> 代码如下
> https://paste.ubuntu.com/p/9WrBz3Xrc6/
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-12-17 Thread Storm☀️
flink 1.10.1 同样遇到这个问题 设置了ttl但是没有生效,请问题主解决该问题了吗?
*sql*:
select
* 
from 
xx
group by 
TUMBLE(monitor_processtime, INTERVAL '60' SECOND),topic_identity

*60s的窗口,设置的过期时间是2分钟,但是checkpoint中状态还是在变大*

*tEnv.getConfig().setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(5)); *
   



--
Sent from: http://apache-flink.147419.n8.nabble.com/