Flink SQL自定义source解析weblog获取数据的topic信息

2021-01-18 文章 gimlee
之前用的spark做weblog解析,里面用topic中的partition+offset来做唯一key去重,示例如下:
String key =
MD5Utils.string2MD5(record.topic()+record.partition()+record.offset());

现在改用flink sql自定义source解析weblog,在解析weblog的时候,发现已经没有kafka相关信息,覆写的函数如下:
@Override
public DecodingFormat>
createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig
formatOptions) {}

请问,有办法在自定义source里面得到读取kafka的相关信息的办法没有?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Pyflink 提交 Batch 任务后报错 “Failed to execute sql”

2021-01-18 文章 岳坤
Hi, 请教一个问题: 我近期在使用Pyflink,有个问题经常会出现,就是在提交Batch任务的时候,经常会报 “Failed to execute 
sql” 的错误,终端输出的具体错误信息如下: Traceback (most recent call last): File 
"/opt/flink/ha_store/test/device_status_statistics.py", line

flink offset自定义存储

2021-01-18 文章 zjfpla...@hotmail.com
Hi,
flink有没有方式自定义偏移量存储,例如存redis等,不使用checkpoint/savepoint等,类似spark 
streaming的功能:
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store
 
   



zjfpla...@hotmail.com



Re:Re: 回复:flink怎么读kafka offset

2021-01-18 文章 air23
是可以这样恢复。
但是如果使用setStartFromGroupOffsets() 如果中间程序挂了,
会导致消费者已经提交了offset,但是下游数据没有处理完 ,或者没有sink到下游
下次重启会从kakfa的偏移量开始消费
没有处理的数据,会丢失

















在 2021-01-18 10:50:44,"hoose" <307840...@qq.com> 写道:
>我理解的是这样,虽然不是从savepoint里恢复,但kafka
>consumer_topic有了我们之前保存的groupid,那么默认的是flink是这样设置:
>
>setStartFromGroupOffsets():默认值,从当前消费组记录的偏移量开始,接着上次的偏移量消费
>
>这样可以理解还是从上次提交的offset开始继续消费对吧?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


1.12.0版本启动异常 on yarn per job方式

2021-01-18 文章 guanyq
看错误是与hadoop-common-2.7.4.jar冲突,但是不知道如何解决。
help
2021-01-1915:12:47,922ERRORorg.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
 [] - Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
Could not start the ResourceManager 
akka.tcp://flink@dn138.hadoop.unicom:45554/user/rpc/resourcemanager_0
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:220)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:183)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:551)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:172)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.12.0.jar:1.12.0]
Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep(JLjava/util/concurrent/TimeUnit;)Lorg/apache/hadoop/io/retry/RetryPolicy;
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:280) 
~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRetryPolicy(RMProxy.java:211) 
~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.createRetriableProxy(RequestHedgingRMFailoverProxyProvider.java:95)
 ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:77)
 ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:190)
 ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:120) 
~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94) 
~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
 ~[hadoop-yarn-common-2.7.3.2.6.0.3-8.jar:?]
at 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.serviceStart(AMRMClientImpl.java:186)
 ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) 
~[FlinkDataProcess.jar:?]
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.serviceStart(AMRMClientAsyncImpl.java:93)
 ~[hadoop-yarn-client-2.7.3.2.6.0.3-8.jar:?]
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) 
~[FlinkDataProcess.jar:?]
at 
org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:159)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:80)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 

Table proram cannot be compiled

2021-01-18 文章 Asahi Lee
??   flink 
1.12.0??org.apach.flink.table.runtime.generated.CompileUtils.compile()DataSteamTable
 program cannot be compiled. This is a bug. Please file an 
issue.?? public abstract java.lang.Object 
org.apache.flink.api.java.functions.KeySelector.getKey(java.lang.Object) 
??

Re: 获取flinksql返回的查询结果

2021-01-18 文章 xingoo
最近刚好做了这个功能,基本的原理如Jeff所说,提交到集群后sink到内存。
然后在本地创建socket server,sink到内存的数据直接通过socket传回到本地。

具体做法即可以参考zeppelin,也可以参考sql-client模块的ChangelogCollectStreamResult



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.11.3 PackagedProgram启动报错

2021-01-18 文章 xingoo
Dear All:
运行环境:flink-1.11.3
运行背景:想要通过PackagedProgram启动Flink Jar(流程模拟flink run)
代码:
|
String[] programArgs = new String[]{};
String jarFilePath = 
"D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar";
List classpaths = Collections.singletonList(new 
URL("file://D:\\workspace\\FlinkJarDemo\\target\\FlinkJarDemo-1.0-SNAPSHOT.jar"));

Configuration configuration = new Configuration();
configuration.set(DeploymentOptions.ATTACHED, true);
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
configuration.setString("execution.target", "local");
configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
configuration.setString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
 
"org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;org.apache.flink.api.java.functions.KeySelector;");

// Get assembler class
String entryPointClass = "a.b.c.Test";
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;

PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(jarFile)
.setUserClassPaths(classpaths)
.setEntryPointClassName(entryPointClass)
.setConfiguration(configuration)
//.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
.setArguments(programArgs)
.build();

ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(),
configuration,
program,
false,
false
);
|

运行后提示:

|
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate outputs in order.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1150)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1134)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:284)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:271)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:69)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1372)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:699)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
at java.util.ArrayList.readObject(ArrayList.java:799)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2294)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 

Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 文章 Xingbo Huang
Thanks Xintong for the great work!

Best,
Xingbo

Peter Huang  于2021年1月19日周二 下午12:51写道:

> Thanks for the great effort to make this happen. It paves us from using
> 1.12 soon.
>
> Best Regards
> Peter Huang
>
> On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  wrote:
>
> > Thanks Xintong for the great work as our release manager!
> >
> >
> > Best,
> > Yang
> >
> > Xintong Song  于2021年1月19日周二 上午11:53写道:
> >
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
> Flink
> >> 1.12 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
> >>
> >> The full release notes are available in Jira:
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >>
> >> Regards,
> >> Xintong
> >>
> >
>


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 macdoor
可以的,怎么发给你?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql任务提交,sql一直只执行一个

2021-01-18 文章 花乞丐
现在我使用flink cdc
读取mysql的binlog,然后发送至kafak,使用flink读取kafka消息,最后写入hive中,但是现在我在向yarn提交代码的时候,发现提交了两个job,但是,两个job执行的都是insert
into kafka.order_info;一直不执行insert into
ods.order_info;程序目前也没有任何报错!代码如下,是我提交job的姿势不对吗,还是什么其他的问题?提交命令:flink run -m
yarn-client -ynm mysql-cdc-2-hive -ys 3 -yjm 4g -ytm 8g -c
com.zallsteel.flink.app.log.MySQLCDC2HiveApp -d
/opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar

package com.zallsteel.flink.app.log;


import com.google.gson.Gson;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.utils.ConfigUtils;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.text.ParseException;
import java.time.Duration;
import java.util.Properties;

/**
 *
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
 */
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(6);

env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(
catalogName,
"default",
"/etc/hive/conf"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
"  ) WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'hdp-xxx-dev-node01',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = 'phkC4DE4dM28$PUD',\n" +
"'database-name' = 'cdc_test',\n" +
"'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
   

Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 文章 Forward Xu
Thanks Xintong for the great work!


Best,

Forward

Yang Wang  于2021年1月19日周二 下午12:16写道:

> Thanks Xintong for the great work as our release manager!
>
>
> Best,
> Yang
>
> Xintong Song  于2021年1月19日周二 上午11:53写道:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.12.1, which is the first bugfix release for the Apache Flink 1.12
> > series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> > https://flink.apache.org/news/2021/01/19/release-1.12.1.html
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Regards,
> > Xintong
> >
>


Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 文章 Yang Wang
Thanks Xintong for the great work as our release manager!


Best,
Yang

Xintong Song  于2021年1月19日周二 上午11:53写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.12.1, which is the first bugfix release for the Apache Flink 1.12
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Xintong
>


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 Yang Wang
看着是有很多Connecting websocket 和 Scheduling reconnect task的log
我觉得还是你的Pod和APIServer的网络不是很稳定

另外,可以的话,你把DEBUG级别的JobManager完整log发一下

Best,
Yang

macdoor  于2021年1月19日周二 上午9:31写道:

> 多谢!打开了DEBUG日志,仍然只有最后一个ERROR,不过之前有不少包含
> kubernetes.client.dsl.internal.WatchConnectionManager  的日志,grep
> 了一部分,能看出些什么吗?
>
> job-debug-0118.log:2021-01-19 02:12:25,551 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:25,646 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c
> job-debug-0118.log:2021-01-19 02:12:25,647 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:30,128 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5a9fa83e
> job-debug-0118.log:2021-01-19 02:12:30,176 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c
> job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Closing websocket
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket@15b15029
> job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason:
> job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket
> job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2cdbe5a0
> job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Closing websocket
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket@1e3f5396
> job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason:
> job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket
> job-debug-0118.log:2021-01-19 02:12:42,677 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b
> job-debug-0118.log:2021-01-19 02:12:42,678 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:42,920 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398
> job-debug-0118.log:2021-01-19 02:12:42,921 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:12:45,130 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Connecting websocket ...
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@4b318628
> job-debug-0118.log:2021-01-19 02:12:45,132 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket successfully opened
> job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398
> job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Closing websocket
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket@69d1ebd2
> job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> WebSocket close received. code: 1000, reason:
> job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
> Ignoring onClose for already closed/closing websocket
> job-debug-0118.log:2021-01-19 02:13:05,940 DEBUG
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
> closing the watch
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b
> 

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Yang Wang
具体你可以看一下YarnClusterDescriptor和YarnLogConfigUtil这两个类的代码
里面包含了如何来发现log4j的配置文件,以及如何来注册LocalResource,让Yarn来进行配置分发

Best,
Yang

Bobby <1010445...@qq.com> 于2021年1月18日周一 下午11:17写道:

> 首先感谢提供解决方案。我回头就去试试。
>
>
> 关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
> on yarn 部署逻辑。
>
> thx.
>
>
> Yang Wang wrote
> > 在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件
> >
> > 但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
> > 在相应的目录下放自己的flink-conf.yaml和log4j.properties
> >
> > Best,
> > Yang
> >
> > Bobby <
>
> > 1010445050@
>
> >> 于2021年1月18日周一 下午7:18写道:
> >
> >> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
> >> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
> >> thx。
> >>
> >>
> >> Flink版本:1.9.1
> >> 部署方式:Flink on Yarn
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


有做过提供实时大屏数据支持,需实现封装逻辑,读取excel中保存的多个规范好的sql语句(sql语句可达到50多个),通过Flink来执行读取到的多个sql语句,将计算的结果按照固定格式字段写入到MySQL中,有什么好的实现方案吗?

2021-01-18 文章 18293503878
有做过提供实时大屏数据支持,需实现封装逻辑,读取excel中保存的多个规范好的sql语句(sql语句可达到50多个),通过Flink来执行读取到的多个sql语句,将计算的结果按照固定格式字段写入到MySQL中,有什么好的实现方案吗?

[ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 文章 Xintong Song
The Apache Flink community is very happy to announce the release of Apache
Flink 1.12.1, which is the first bugfix release for the Apache Flink 1.12
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2021/01/19/release-1.12.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xintong


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 macdoor
多谢!打开了DEBUG日志,仍然只有最后一个ERROR,不过之前有不少包含
kubernetes.client.dsl.internal.WatchConnectionManager  的日志,grep
了一部分,能看出些什么吗?

job-debug-0118.log:2021-01-19 02:12:25,551 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket successfully opened
job-debug-0118.log:2021-01-19 02:12:25,646 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Connecting websocket ...
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c
job-debug-0118.log:2021-01-19 02:12:25,647 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket successfully opened
job-debug-0118.log:2021-01-19 02:12:30,128 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Connecting websocket ...
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5a9fa83e
job-debug-0118.log:2021-01-19 02:12:30,176 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket successfully opened
job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
closing the watch
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2553d42c
job-debug-0118.log:2021-01-19 02:12:39,028 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Closing websocket
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@15b15029
job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket close received. code: 1000, reason: 
job-debug-0118.log:2021-01-19 02:12:39,030 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Ignoring onClose for already closed/closing websocket
job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
closing the watch
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@2cdbe5a0
job-debug-0118.log:2021-01-19 02:12:39,031 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Closing websocket
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@1e3f5396
job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket close received. code: 1000, reason: 
job-debug-0118.log:2021-01-19 02:12:39,033 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Ignoring onClose for already closed/closing websocket
job-debug-0118.log:2021-01-19 02:12:42,677 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Connecting websocket ...
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b
job-debug-0118.log:2021-01-19 02:12:42,678 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket successfully opened
job-debug-0118.log:2021-01-19 02:12:42,920 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Connecting websocket ...
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398
job-debug-0118.log:2021-01-19 02:12:42,921 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket successfully opened
job-debug-0118.log:2021-01-19 02:12:45,130 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Connecting websocket ...
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@4b318628
job-debug-0118.log:2021-01-19 02:12:45,132 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket successfully opened
job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
closing the watch
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@278d8398
job-debug-0118.log:2021-01-19 02:13:05,927 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Closing websocket
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@69d1ebd2
job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket close received. code: 1000, reason: 
job-debug-0118.log:2021-01-19 02:13:05,930 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Ignoring onClose for already closed/closing websocket
job-debug-0118.log:2021-01-19 02:13:05,940 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] - Force
closing the watch
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@210aab4b
job-debug-0118.log:2021-01-19 02:13:05,940 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
Closing websocket
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket@3db9d8d8
job-debug-0118.log:2021-01-19 02:13:05,942 DEBUG
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager [] -
WebSocket close received. code: 

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
首先感谢提供解决方案。我回头就去试试。

关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
on yarn 部署逻辑。

thx.


Yang Wang wrote
> 在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件
> 
> 但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
> 在相应的目录下放自己的flink-conf.yaml和log4j.properties
> 
> Best,
> Yang
> 
> Bobby <

> 1010445050@

>> 于2021年1月18日周一 下午7:18写道:
> 
>> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
>> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
>> thx。
>>
>>
>> Flink版本:1.9.1
>> 部署方式:Flink on Yarn
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
11



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
首先感谢提供解决方案。我回头就去试试。

关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
on yarn 部署逻辑。

thx.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Yang Wang
在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件

但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
在相应的目录下放自己的flink-conf.yaml和log4j.properties

Best,
Yang

Bobby <1010445...@qq.com> 于2021年1月18日周一 下午7:18写道:

> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
> thx。
>
>
> Flink版本:1.9.1
> 部署方式:Flink on Yarn
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 Yang Wang
可以用iperf来进行网络的测试,你需要在镜像里面提前安装好

另外,可以打开debug log看一下是不是Watch经过了很多次重试都连不上,才导致失败的

Best,
Yang

macdoor  于2021年1月18日周一 下午7:08写道:

> 我查看了一下之前的日志,没有发现 too old resource
> version,而且连续几个日志都没有其他错误,直接就这个错误,restart,然后就是一个新日志了。
>
> 我用的k8s集群似乎网络确实不太稳定,请教一下如何测试Pod和APIServer之间的网络比较容易说明问题?ping?或者什么工具?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 获取flinksql返回的查询结果

2021-01-18 文章 Jeff Zhang
Sink 到内存里,然后你自己处理(print出来还是发送到web前端)
可以参考zeppelin源码 https://github.com/apache/zeppelin/tree/master/flink


黑色  于2021年1月18日周一 下午8:42写道:

> 请教个问题,我想实现类似sql-cli里功能一样,或者zepplin里的我在页面上编写select * from ,
>
>
> 在页面下面得到返回的结果显示,类似zepplin在开发区写sql,下面的输出区显示返回的结果,这个功能如何实现呢
> 各位大哥们帮忙看一看?
>
>
> 现在在flinksql上想看结果,还的定义一个with='print',跑到ui页面上去看,太麻烦了



-- 
Best Regards

Jeff Zhang


????flinksql??????????????

2021-01-18 文章 ????
sql-clizepplin??select 
* from ??


??zepplin??sql,??
??


??flinksqlwith='print'??ui

Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
thx。


Flink版本:1.9.1
部署方式:Flink on Yarn



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 macdoor
我查看了一下之前的日志,没有发现 too old resource
version,而且连续几个日志都没有其他错误,直接就这个错误,restart,然后就是一个新日志了。

我用的k8s集群似乎网络确实不太稳定,请教一下如何测试Pod和APIServer之间的网络比较容易说明问题?ping?或者什么工具?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-18 文章 gimlee
是用的pushgateway方式上报嘛?
这种方式promethus要主动去删除上报的数据项才会变成0



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flinksql中如何写自定义的Table Aggregate Functions语句

2021-01-18 文章 jiangwan
 
请问下,官网给出了在table api中调用Table Aggregate Functions的语句,能否在flinksql中调用自定义的Table
Aggregate Functions(返回结果有多行,允许重命名)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请教个Flink checkpoint的问题

2021-01-18 文章 gimlee
设置下state.checkpoints.num-retained这个值,像是数量超了自动删除



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-18 文章 Yang Wang
你搜索一下看看有没有too old resource version的报错
另外,测试一下Pod和APIServer的网络状态,是不是经常断

Best,
Yang

macdoor  于2021年1月18日周一 上午9:45写道:

> 大约几十分钟就会restart,请教大佬们有查的思路,每次抛出的错误都是一样的,运行一段时间也会积累很多ConfigMap,下面是一个具体的错误
>
> 错误内容
>
> 2021-01-17 04:16:46,116 ERROR
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader
> at
>
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_275]
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_275]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-01-17 04:16:46,117 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader
> at
>
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> 

Re: flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题

2021-01-18 文章 Yang Wang
看着是Watch的时候报错了,你的K8s环境是怎么样的,如果Pod和K8s APIServer的网络状况不是很稳定会导致这个问题的

我这边在minikube和阿里云的ACK集群都做过测试,长时间运行(超过一周)并没有出现too old resource version等引起的JM重启

鉴于好几个人都反馈有这样的问题,会在1.12的下个bug fix(1.12.2)版本修复一下


Best,
Yang


macdoor  于2021年1月18日周一 上午9:45写道:

> 您好,我刚刚开始使用 flink 1.12.1 HA on
> k8s,发现jobmanager大约半小时左右会restart,都是这种错误,您遇到过吗?谢谢!
>
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Suspending
> SlotPool.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> [] - Close ResourceManager connection 28ed7c84e7f395c5a34880df91b251c6:
> Stopping JobMaster for job p_port_traffic_5m@hive->mysql @2021-01-17
> 11:40:00(67fb9b15d0deff998e287aa7e2cf1c7b)..
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
> SlotPool.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager
> 8c450d0051eff8c045adb76cb9ec4...@akka.tcp://flink@flink-jobmanager
> :6123/user/rpc/jobmanager_32
> for job 67fb9b15d0deff998e287aa7e2cf1c7b from the resource manager.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
> [] - Closing
>
> KubernetesLeaderElectionDriver{configMapName='test-flink-etl-67fb9b15d0deff998e287aa7e2cf1c7b-jobmanager-leader'}.
> 2021-01-17 04:52:12,399 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
> [] - The watcher is closing.
> 2021-01-17 04:52:12,416 INFO
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
> job graph 67fb9b15d0deff998e287aa7e2cf1c7b from
>
> KubernetesStateHandleStore{configMapName='test-flink-etl-dispatcher-leader'}.
> 2021-01-17 04:52:30,686 ERROR
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader
> at
>
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.12.1.jar:1.12.1]
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_275]
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_275]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-01-17 04:52:30,691 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
> while watching the ConfigMap
> test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader
> at

Re: flink native k8s 有计划支持hostAlias配置码?

2021-01-18 文章 Yang Wang
目前对于一些不是经常使用的功能,社区打算使用pod template来统一支持
我理解应该是可以满足你的需求的
这样更加灵活,也会有更好的扩展性,具体你可以看一下这个JIRA[1]

已经有了一个draft的PR,会很快在完成后提交正式PR,然后review
你也可以先试用一下,有问题及时反馈

[1]. https://issues.apache.org/jira/browse/FLINK-15656

Best,
Yang

高函  于2021年1月18日周一 上午11:13写道:

>
>
> 请问社区有计划支持native k8s模式下配置hostAlais码?
> 如果自己扩展的话,需要在模块中添加对应的hostAlais的配置项,并打包自定义的docker 镜像码?
> 谢谢~
>


回复:flink 设置broadcastStream 的MapStateDescriptor

2021-01-18 文章 smq
多谢,刚开始理解有问题



发自我的iPhone


-- 原始邮件 --
发件人: 赵一旦