Re: flink 设置broadcastStream 的MapStateDescriptor

2021-01-17 文章 赵一旦
key和value都是你自己设置的,看你需要设置什么类型哈。这个不是强制的。
你的map state的key和value在具体业务场景下需要什么类型,那个地方就设置什么类型的TypeInformation,懂吧。

smq <374060...@qq.com> 于2021年1月18日周一 下午12:18写道:

> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 明启 孙 <374060...@qq.com>
> 发送时间: 2021年1月18日 11:30
> 收件人: user-zh  主题: 转发:flink 设置broadcastStream 的MapStateDescriptor
>
>
>
>
> 大家好:
>
>    MapStateDescriptor (String name,
> TypeInformation valueTypeInfo)中的第二个参数key值得是什么,其中value指的是状态值,在网上看到key大部分用的是String类型,不知道这个key是根据什么指定的。
>
>  
>
> smq
>
>  


flink1.10.1 merge??????????scala??????????????????flink??bug??

2021-01-17 文章 bigdata
??
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction 
com.autoai.cns.udaf.PercentileUDAF'mergescala??java??
    flink1.10.1 
merge??scala??flink??bug??

def merge(accumulator: ListBuffer[Float], its: 
java.lang.Iterable[ListBuffer[Float]]): Unit = {
//its.forEach(i => accumulator ++ i)
val iter = its.iterator()
while (iter.hasNext) {
  val a = iter.next()
  accumulator ++ a
}
  }

Re: flink滑动窗口输出结果的问题

2021-01-17 文章 赵一旦
我看你还写到 “每分钟触发统计一次结果”,你是不是做了自定义trigger啥的,导致逻辑不对了。

默认情况就可以实现你要的效果,不要自定义trigger哈这里。

赵一旦  于2021年1月18日周一 下午3:52写道:

> 补充下,是针对每个key,每1min输出一个结果。采用1h窗口,1min滑动窗口。
>
>
>
> 赵一旦  于2021年1月18日周一 下午3:51写道:
>
>> 你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
>> 不是很清楚你表达的最早什么的是什么含义。
>>
>> 基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。
>>
>> eriendeng  于2021年1月18日周一 上午11:42写道:
>>
>>> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
>>> by然后再用window时间筛选你要的数据。
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>>


Re: flink滑动窗口输出结果的问题

2021-01-17 文章 赵一旦
补充下,是针对每个key,每1min输出一个结果。采用1h窗口,1min滑动窗口。



赵一旦  于2021年1月18日周一 下午3:51写道:

> 你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
> 不是很清楚你表达的最早什么的是什么含义。
>
> 基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。
>
> eriendeng  于2021年1月18日周一 上午11:42写道:
>
>> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
>> by然后再用window时间筛选你要的数据。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: flink滑动窗口输出结果的问题

2021-01-17 文章 赵一旦
你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
不是很清楚你表达的最早什么的是什么含义。

基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。

eriendeng  于2021年1月18日周一 上午11:42写道:

> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
> by然后再用window时间筛选你要的数据。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


??????????: flink sql hop????????udaf????????????merge??????????????????????

2021-01-17 文章 bigdata
??
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
 
  override def getValue(accumulator: ListBuffer[Float]): String = {
//
val length = accumulator.size
var i1 = Math.round(length*percentile1).toInt
if(i1==0) i1 = 1
var i2 = Math.round(length*percentile2).toInt
if(i2==0) i2 = 1
var i3 = Math.round(length*percentile3).toInt
if(i3==0) i3 = 1
var i4 = Math.round(length*percentile4).toInt
if(i4==0) i4 = 1
val seq = accumulator.sorted
//
seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }

  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()

  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
accumulator.append(i)
  }

  def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
its.foreach(i => accumulator ++ i)
  }





--  --
??: 
   "user-zh"



Re: flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-17 文章 花乞丐
多谢各位的耐心回答,我已经找到问题了,目前是水印使用有点问题,是我自己的问题,不好意思各位

修改之后,发现还是无法提交分区数据,经调试发现,watermark值目前是ok,但是其次是,由于Flink的toMills方法使用的UTC时间,导致我们从分区提取值时,比原始值大了8个小时,因此,导致水印一直小于
partition_time+commitDelay。接下来进行相应处理即可。

 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

????: flink sql hop????????udaf????????????merge??????????????????????

2021-01-17 文章 Evan
??

??


 
 bigdata
?? 2021-01-18 14:52
 user-zh
?? flink sql hopudafmerge??
??
flink1.10.1 
sql??hop??udaf??merge??merge??
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
merge ??
1??ImperativeAggCodeGencheckNeededMethods??if (needMerge)   
getUserDefinedMethod 
2??UserDefinedFunctionUtilsgetUserDefinedMethod??merge??mergefalse,??flink??bug
parameterClassEquals(methodSignature(i), clazz) ||
parameterDataTypeEquals(internalTypes(i), dataTypes(i))


??
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
  its.foreach(i => accumulator ++ i)
}


flink sql hop????????udaf????????????merge??????????????????????

2021-01-17 文章 bigdata
??
    flink1.10.1 
sql??hop??udaf??merge??merge??
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
merge ??
1??ImperativeAggCodeGencheckNeededMethods??if (needMerge)  
 getUserDefinedMethod 
2??UserDefinedFunctionUtilsgetUserDefinedMethod??merge??mergefalse,??flink??bug
parameterClassEquals(methodSignature(i), clazz) ||
parameterDataTypeEquals(internalTypes(i), dataTypes(i))


??
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
  its.foreach(i => accumulator ++ i)
}

Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-17 文章 Shengkai Fang
这种情况一般是kafka的某个分区,不存在数据,导致总体的watermark不前进。遇到这种情况一般是需要手动设置idle
source[1]。但是社区的watemark push down存在一些问题[2],已经在修复了。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
[2]
https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel


花乞丐  于2021年1月18日周一 上午11:42写道:

>
> 代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive
> shell中执行命令:hive (ods)> msck repair table
>
> order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教!
> package com.zallsteel.flink.app.log;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import com.google.gson.Gson;
> import com.google.gson.JsonArray;
> import com.google.gson.JsonElement;
> import com.google.gson.JsonParser;
> import com.zallsteel.flink.entity.ChangelogVO;
> import com.zallsteel.flink.entity.OrderInfo;
> import com.zallsteel.flink.utils.ConfigUtils;
>
> import lombok.SneakyThrows;
> import org.apache.commons.lang3.time.FastDateFormat;
> import org.apache.flink.api.common.eventtime.*;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.text.ParseException;
> import java.time.Duration;
> import java.util.Date;
> import java.util.Properties;
>
> /**
>  * @author Jackie Zhu
>  * @time 2021-01-13 16:50:18
>  * @desc 测试MySQLCDC to Hive
>  */
> public class MySQLCDC2HiveApp {
> public static void main(String[] args) {
> //获取执行环节
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置并发
> env.setParallelism(6);
> //设置checkpoint
> env.enableCheckpointing(6);
> env.getConfig().setAutoWatermarkInterval(200);
> // 设置Flink SQL环境
> EnvironmentSettings tableEnvSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> // 创建table Env
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env,
> tableEnvSettings);
> // 设置checkpoint 模型
>
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
> // 设置checkpoint间隔
>
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(1));
> // 指定catalog名称
> String catalogName = "devHive";
> // 创建HiveCatalog
> HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
> "default",
> "/home/beggar/tools/apache-hive-3.1.2-bin/conf",
> "/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
> "3.1.2"
> );
> //注册 Hive Catalog
> tableEnv.registerCatalog(catalogName,hiveCatalog);
> //使用hive Catalog
> tableEnv.useCatalog(catalogName);
> //创建mysql cdc 数据源
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
> // 创建mysql cdc 数据表
> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
> tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
> "id BIGINT,\n" +
> "user_id BIGINT,\n" +
> "create_time TIMESTAMP,\n" +
> "operate_time TIMESTAMP,\n" +
> "province_id INT,\n" +
> "order_status STRING,\n" +
> "total_amount DECIMAL(10, 5)\n" +
> "  ) WI

转发:flink 设置broadcastStream 的MapStateDescriptor

2021-01-17 文章 smq
发自我的iPhone


-- 原始邮件 --
发件人: 明启 孙 <374060...@qq.com>
发送时间: 2021年1月18日 11:30
收件人: user-zh 

flink sql hop????????udaf????????????merge??????????????????????

2021-01-17 文章 bigdata
??
    flink1.10.1 
sql??hop??udaf??merge??merge??
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
merge ??
1??ImperativeAggCodeGencheckNeededMethods??if (needMerge)  
 getUserDefinedMethod 
2??UserDefinedFunctionUtilsgetUserDefinedMethod??merge??mergefalse,??flink??bug
parameterClassEquals(methodSignature(i), clazz) ||
parameterDataTypeEquals(internalTypes(i), dataTypes(i))


??
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
  its.foreach(i => accumulator ++ i)
}

Re: flink滑动窗口输出结果的问题

2021-01-17 文章 eriendeng
只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
by然后再用window时间筛选你要的数据。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-17 文章 花乞丐
代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive
shell中执行命令:hive (ods)> msck repair table
order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教!
package com.zallsteel.flink.app.log;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.entity.OrderInfo;
import com.zallsteel.flink.utils.ConfigUtils;

import lombok.SneakyThrows;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;

/**
 * @author Jackie Zhu
 * @time 2021-01-13 16:50:18
 * @desc 测试MySQLCDC to Hive
 */
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(6);
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
"/home/beggar/tools/apache-hive-3.1.2-bin/conf",
"/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
"3.1.2"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
"  ) WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'beggar',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = '123456',\n" +
"'database-name' = 'cdc',\n" +
"'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE k

flink 设置broadcastStream 的MapStateDescriptor

2021-01-17 文章 明启 孙
大家好:
MapStateDescriptor (String name, TypeInformation keyTypeInfo, 
TypeInformation 
valueTypeInfo)中的第二个参数key值得是什么,其中value指的是状态值,在网上看到key大部分用的是String类型,不知道这个key是根据什么指定的。

smq



Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-17 文章 花乞丐
package com.zallsteel.flink.app.log;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.zallsteel.flink.entity.ChangelogVO;
import com.zallsteel.flink.entity.OrderInfo;
import com.zallsteel.flink.utils.ConfigUtils;

import lombok.SneakyThrows;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;

/**
 * @desc 测试MySQLCDC to Hive
 */
public class MySQLCDC2HiveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(6);
//设置checkpoint
env.enableCheckpointing(6);
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
"/home/beggar/tools/apache-hive-3.1.2-bin/conf",
"/home/beggar/tools/hadoop-3.1.1/etc/hadoop",
"3.1.2"
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);
//创建mysql cdc 数据源
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
// 创建mysql cdc 数据表
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info");
tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
"  ) WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'beggar',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = '123456',\n" +
"'database-name' = 'cdc',\n" +
"'table-name' = 'order_info'\n" +
")");
// 创建kafka source
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info");
tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" +
"id BIGINT,\n" +
"user_id BIGINT,\n" +
"create_time TIMESTAMP,\n" +
"operate_time TIMESTAMP,\n" +
"province_id INT,\n" +
"order_status STRING,\n" +
"total_amount DECIMAL(10, 5)\n" +
  

flink native k8s 有计划支持hostAlias配置码?

2021-01-17 文章 高函


请问社区有计划支持native k8s模式下配置hostAlais码?
如果自己扩展的话,需要在模块中添加对应的hostAlais的配置项,并打包自定义的docker 镜像码?
谢谢~


Re: 回复:flink怎么读kafka offset

2021-01-17 文章 hoose
我理解的是这样,虽然不是从savepoint里恢复,但kafka
consumer_topic有了我们之前保存的groupid,那么默认的是flink是这样设置:

setStartFromGroupOffsets():默认值,从当前消费组记录的偏移量开始,接着上次的偏移量消费

这样可以理解还是从上次提交的offset开始继续消费对吧?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink滑动窗口输出结果的问题

2021-01-17 文章 marble.zh...@coinflex.com.INVALID
 是的, 现在的问题是sliding会产生多个结果,而我只要输出最早的那个窗口的结果数据。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink CEP 动态加载 pattern

2021-01-17 文章 mokaful
Hi,对于动态更新cep pattern这个问题,不知道楼主现在有啥突破可以借鉴一下呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink监控

2021-01-17 文章 penguin.

那请问对于每个节点的CPU、内存使用率以及节点之间的通信量如何进行实时监控获取数据呢?

















在 2021-01-18 10:15:22,"赵一旦"  写道:
>slot好像只是逻辑概念,监控意义不大,没有资源隔离。
>
>penguin.  于2021年1月15日周五 下午5:06写道:
>
>> Hi,
>> flink集群中,能对TaskManager的每个TaskSlot进行监控吗?比如每个slot的cpu和内存使用率之类的指标。
>>
>>
>> penguin


回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 刘海
还有一个问题,我已经有一个job在运行了,当我再次提交一个job运行的时候输出下面这些信息,去yarn查看发现job并未启动起来,有遇到过这个现象吗?


[root@cdh1 flink-1.12.0]# ./bin/flink run -d -t yarn-per-job  -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180 
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/opt/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[root@cdh1 flink-1.12.0]#




| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:42,刘海 写道:
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?


放在 /opt/flink-1.12.0/examples目录下了


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:38,Yangze Guo 写道:
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海  wrote:

你好
根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180  
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
dependencies from JAR file: JAR file does not exist: 1536
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:


刘海
liuha...@163.com
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to 

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 刘海
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?


放在 /opt/flink-1.12.0/examples目录下了


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:38,Yangze Guo 写道:
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海  wrote:

你好
根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180  
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
dependencies from JAR file: JAR file does not exist: 1536
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:


刘海
liuha...@163.com
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list

Re: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 Yangze Guo
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海  wrote:
>
> 你好
>  根据你的建议我试了一下
> 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
> jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB 
> -D heartbeat.timeout=180  
> /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
>
>
> jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
>
>
> 结果出现找不到jar包的异常:
> org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
> dependencies from JAR file: JAR file does not exist: 1536
> at 
> org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>  [hadoop-common-3.0.0-cdh6.3.2.jar:?]
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
> at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> ... 8 more
>
>
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年1月18日 10:12,Yangze Guo 写道:
> Hi, 请使用 -D -tm -jm 不需要加y前缀
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:
>
>
> 刘海
> liuha...@163.com
> 签名由 网易邮箱大师 定制
> 在2021年1月18日 09:15,刘海 写道:
>
> Hi  Dear All,
> 请教各位一个问题,下面是我的集群配置:
> 1、我现在使用的是flink1.12版本;
> 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
> 3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
> 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
>
> #==
> # Common 通用设置选项
> #==
> jobmanager.rpc.address: cdh1
>
> # The RPC port where the JobManager is reachable.
> jobmanager.rpc.port: 6123
> # The total process memory size for the JobManager.
> #
> # Note this accounts for all memory usage within the JobManager process, 
> including JVM metaspace and other overhead.
> jobmanager.memory.process.size: 2048m
>
> # The total process memory size for the TaskManager.
> # Note this accounts for all memory usage within the TaskManager process, 
> including JVM metaspace and other overhead.
> taskmanager.memory.process.size: 6144m
>
> # To exclude JVM metaspace and overhead, please, use total Flink memory size 
> instead of 'taskmanager.memory.process.size'.
> # It is not recommended to set both 'taskmanager.memory.process.size' and 
> Flink memory.
> # taskmanager.memory.flink.size: 1280m
> # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
> #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
> #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
> taskmanager.numberOfTaskSlots: 1
> # The parallelism used for programs that did not specify and other 
> parallelism.
> #当未在任何地方指定并行度时使用的默认并行性(默认值:1)
> parallelism.default: 1
> #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
> #taskmanager.host: 0.0.0.0
> # The default file system scheme and authority.
> # By default file paths without scheme are interpreted relative to the local
> # root file system 'file:///'. Use this to override the default and interpret
> # relative paths relative to a different file system,
> # for example 'hdfs://mynamenode:12345'
> #
> # fs.default-scheme
>
> #==
> # High Availability
> #==
> # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
> high-availability: zookeeper
> # The path where metadata for master recovery is persisted. While ZooKeeper 
> stores
> # the small ground truth for checkpoint and leader election, this location 
> stores
> # the larger objects, like persisted dataflow graphs.
> # Must be a durable file system that is accessible from all nodes
> # (like HDFS, S3, Ceph, nfs, ...)
> high-availability.storageDir: hdfs:///flink/ha/
> 

????

2021-01-17 文章 Tent



Thanks,
BR,
Tent

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 刘海
你好 
 根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D 
jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D 
heartbeat.timeout=180  
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and 
dependencies from JAR file: JAR file does not exist: 1536
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 
org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:


刘海
liuha...@163.com
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#===

flink sql hop????????udaf????

2021-01-17 文章 bigdata
??
    flink1.10.1 
sql??hop??udaf??merge??merge??
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
  its.foreach(i => accumulator ++ i)
}

PROCTIME()函数语义问题

2021-01-17 文章 smailxie
如果PROMCTIME()函数的语义指的是机器处理record的本地时间,那为什么flink要取UTC时区的时间?













--

Name:谢波
Mobile:13764228893


Re: flink滑动窗口输出结果的问题

2021-01-17 文章 赵一旦
从你的描述来看,你说的貌似就是sliding window呀。
9-10,9.01-10.01...

marble.zh...@coinflex.com.INVALID 
于2021年1月15日周五 下午5:45写道:

> 大家好, 我现在的场景需求是,窗口size是1个小时,每分钟触发统计一次结果。比如现在是10点钟,则统计9点到10点的结果。
> 下一分钟则在10:01分时触发统计9:01到10:01的结果。
>
> 如果用Sliding window, 比如.timeWindow(Time.hours(1L), Time.minutes(1)),
> 则会输出60/1=60个结果集,这不是我想要的结果,我只想要当前时间往前一个小时的结果。
>
> 除了在window function api做逻辑过虑外,还有什么方法可以实现这种场景?
>
> 滚动窗口的话不适合,它每次是输出整点的,比如从9点到10点,然后就跳到10点到11点,也不符合我的业务要求。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink监控

2021-01-17 文章 赵一旦
slot好像只是逻辑概念,监控意义不大,没有资源隔离。

penguin.  于2021年1月15日周五 下午5:06写道:

> Hi,
> flink集群中,能对TaskManager的每个TaskSlot进行监控吗?比如每个slot的cpu和内存使用率之类的指标。
>
>
> penguin


Re: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 Yangze Guo
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海  wrote:
>
>
> 刘海
> liuha...@163.com
> 签名由 网易邮箱大师 定制
> 在2021年1月18日 09:15,刘海 写道:
>
> Hi  Dear All,
>请教各位一个问题,下面是我的集群配置:
> 1、我现在使用的是flink1.12版本;
> 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
> 3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
> 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
>
> #==
> # Common 通用设置选项
> #==
> jobmanager.rpc.address: cdh1
>
> # The RPC port where the JobManager is reachable.
> jobmanager.rpc.port: 6123
> # The total process memory size for the JobManager.
> #
> # Note this accounts for all memory usage within the JobManager process, 
> including JVM metaspace and other overhead.
> jobmanager.memory.process.size: 2048m
>
> # The total process memory size for the TaskManager.
> # Note this accounts for all memory usage within the TaskManager process, 
> including JVM metaspace and other overhead.
> taskmanager.memory.process.size: 6144m
>
> # To exclude JVM metaspace and overhead, please, use total Flink memory size 
> instead of 'taskmanager.memory.process.size'.
> # It is not recommended to set both 'taskmanager.memory.process.size' and 
> Flink memory.
> # taskmanager.memory.flink.size: 1280m
> # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
> #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
> #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
> taskmanager.numberOfTaskSlots: 1
> # The parallelism used for programs that did not specify and other 
> parallelism.
> #当未在任何地方指定并行度时使用的默认并行性(默认值:1)
> parallelism.default: 1
> #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
> #taskmanager.host: 0.0.0.0
> # The default file system scheme and authority.
> # By default file paths without scheme are interpreted relative to the local
> # root file system 'file:///'. Use this to override the default and interpret
> # relative paths relative to a different file system,
> # for example 'hdfs://mynamenode:12345'
> #
> # fs.default-scheme
>
> #==
> # High Availability
> #==
> # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
> high-availability: zookeeper
> # The path where metadata for master recovery is persisted. While ZooKeeper 
> stores
> # the small ground truth for checkpoint and leader election, this location 
> stores
> # the larger objects, like persisted dataflow graphs.
> # Must be a durable file system that is accessible from all nodes
> # (like HDFS, S3, Ceph, nfs, ...)
> high-availability.storageDir: hdfs:///flink/ha/
> # The list of ZooKeeper quorum peers that coordinate the high-availability
> # setup. This must be a list of the form:
> # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
> high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root: /flink
> #==
> # Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
> #==
> state.backend: rocksdb
> #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
> #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
> #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
> state.backend.incremental: true
> #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
> state.backend.local-recovery: true
> #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
> state.backend.rocksdb.block.cache-size: 268435456
> #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
> state.backend.rocksdb.timer-service.factory: HEAP
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
> state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> # Default target directory for savepoints, optional.
> #保存点的默认目录。由状态后端用于将保存点写入文件系统
> state.savepoints.dir: hdfs:///flink/flink-savepoints
> # 要保留的最大已完成检查点数
> state.checkpoints.num-retained: 3
> #此选项指定作业计算如何从任务失败中恢复。可接受的值为:
> #'full':重新启动所有任务以恢复作业。
> #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
> jobmanager.execution.failover-strategy: region
> #==
> # Advanced
> #==
>
> # Override the directories for temporary files. If not specified, the
> # system-specific Java temporary directory (java.io.tmpdir property) is taken.
> #
> # For framework setups on Yarn or Mesos, Flink will automatically pick up the
> # contai

Re: flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题

2021-01-17 文章 macdoor
您好,我刚刚开始使用 flink 1.12.1 HA on
k8s,发现jobmanager大约半小时左右会restart,都是这种错误,您遇到过吗?谢谢!

2021-01-17 04:52:12,399 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending
SlotPool.
2021-01-17 04:52:12,399 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
  
[] - Close ResourceManager connection 28ed7c84e7f395c5a34880df91b251c6:
Stopping JobMaster for job p_port_traffic_5m@hive->mysql @2021-01-17
11:40:00(67fb9b15d0deff998e287aa7e2cf1c7b)..
2021-01-17 04:52:12,399 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
SlotPool.
2021-01-17 04:52:12,399 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
8c450d0051eff8c045adb76cb9ec4...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_32
for job 67fb9b15d0deff998e287aa7e2cf1c7b from the resource manager.
2021-01-17 04:52:12,399 INFO 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-01-17 04:52:12,399 INFO 
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
[] - Closing
KubernetesLeaderElectionDriver{configMapName='test-flink-etl-67fb9b15d0deff998e287aa7e2cf1c7b-jobmanager-leader'}.
2021-01-17 04:52:12,399 INFO 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
[] - The watcher is closing.
2021-01-17 04:52:12,416 INFO 
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
job graph 67fb9b15d0deff998e287aa7e2cf1c7b from
KubernetesStateHandleStore{configMapName='test-flink-etl-dispatcher-leader'}.
2021-01-17 04:52:30,686 ERROR
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Fatal error occurred in ResourceManager.
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
while watching the ConfigMap
test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader
at
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-01-17 04:52:30,691 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
error occurred in the cluster entrypoint.
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
while watching the ConfigMap
test-flink-etl-12c0ac13184d3d98af71dadbc4a81d03-jobmanager-leader
at
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherTog

K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-17 文章 macdoor
大约几十分钟就会restart,请教大佬们有查的思路,每次抛出的错误都是一样的,运行一段时间也会积累很多ConfigMap,下面是一个具体的错误

错误内容

2021-01-17 04:16:46,116 ERROR
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Fatal error occurred in ResourceManager.
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
while watching the ConfigMap
test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader
at
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-01-17 04:16:46,117 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
error occurred in the cluster entrypoint.
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
while watching the ConfigMap
test-flink-etl-42557c3f6325ffc876958430859178cd-jobmanager-leader
at
org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[fl

回复:yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 刘海


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月18日 09:15,刘海 写道:
Hi  Dear All,
   请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:


#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1


# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m


# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m


# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme


#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled 
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==
# Advanced
#==


# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
# /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
#

yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

2021-01-17 文章 刘海
Hi  Dear All,
   请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:


#==
# Common 通用设置选项
#==
jobmanager.rpc.address: cdh1


# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m


# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m


# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme


#==
# High Availability
#==
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled 
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==
# Advanced
#==


# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
# /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order t

????: flink sql hop????udaf????

2021-01-17 文章 Evan
merge??marge


Evan Cheng
2021??1??18??09:00:07



 
 bigdata
?? 2021-01-17 22:31
 user-zh
?? flink sql hopudaf
??
    flink1.10.1sql 
hop??udafmarge??
org.apache.flink.table.api.ValidationException: Function class 
'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method 
named 'merge' which is public, not abstract and (in case of table functions) 
not static
UDAF??
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
  /**
* 
* @param accumulator ??
* @return ??
*/
  override def getValue(accumulator: ListBuffer[Float]): String = {
//
val length = accumulator.size
var i1 = Math.round(length*percentile1).toInt
if(i1==0) i1 = 1
var i2 = Math.round(length*percentile2).toInt
if(i2==0) i2 = 1
var i3 = Math.round(length*percentile3).toInt
if(i3==0) i3 = 1
var i4 = Math.round(length*percentile4).toInt
if(i4==0) i4 = 1
val seq = accumulator.sorted
//
seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }
  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
accumulator.append(i)
  }
  def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
its.foreach(i => accumulator ++ i)
  }


flink sql hop????udaf????

2021-01-17 文章 bigdata
??
    flink1.10.1sql 
hop??udafmarge??
org.apache.flink.table.api.ValidationException: Function class 
'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method 
named 'merge' which is public, not abstract and (in case of table functions) 
not static
UDAF??
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
  /**
* 
* @param accumulator ??
* @return ??
*/
  override def getValue(accumulator: ListBuffer[Float]): String = {
//
val length = accumulator.size
var i1 = Math.round(length*percentile1).toInt
if(i1==0) i1 = 1
var i2 = Math.round(length*percentile2).toInt
if(i2==0) i2 = 1
var i3 = Math.round(length*percentile3).toInt
if(i3==0) i3 = 1
var i4 = Math.round(length*percentile4).toInt
if(i4==0) i4 = 1
val seq = accumulator.sorted
//
seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }
  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
accumulator.append(i)
  }
  def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
its.foreach(i => accumulator ++ i)
  }

flink sql hop????udaf????

2021-01-17 文章 bigdata
??
    flink1.10.1sql 
hop??udafmarge??
org.apache.flink.table.api.ValidationException: Function class 
'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method 
named 'merge' which is public, not abstract and (in case of table functions) 
not static

[Announce] Zeppelin 0.9.0 is released (Flink on Zeppelin)

2021-01-17 文章 Jeff Zhang
Hi flink folks,

I'd like to tell you that Zeppelin 0.9.0 is officially released, in this
new version we made a big refactoring and improvement on flink support. It
supports 3 major versions of flink (1.10, 1.11, 1.12)

You can download zeppelin 0.9.0 here.
http://zeppelin.apache.org/download.html

And check the following 2 links for more details of how to use flink on
zeppelin
https://app.gitbook.com/@jeffzhang/s/flink-on-zeppelin/
http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


-- 
Best Regards

Jeff Zhang


flink集群监控

2021-01-17 文章 penguin.
Hello,


请问在flink集群中,
怎么对TaskManager的每个TaskSlot进行监控呢?比如每个slot的cpu和内存使用率以及slot通信量之类的指标。
有什么办法来获取节点间的通信量呢?
多谢!


penguin

Re:Re: Re: 请教个Flink checkpoint的问题

2021-01-17 文章 邮件帮助中心
1:先前测试过使用stopWithSavepoint时会将之前成功保存的checkpoint数据给删除掉,后来我们查看了下源码,里面描述如下,就是调用该方法时Flink会将程序设置成Finished态的,可能和实际使用场景有出入。
/**
 * Stops a program on Flink cluster whose job-manager is configured in this 
client's configuration.
 * Stopping works only for streaming programs. Be aware, that the program might 
continue to run for
 * a while after sending the stop command, because after sources stopped to 
emit data all operators
 * need to finish processing.
 *
 * @param jobId the job ID of the streaming program to stop
 * @param advanceToEndOfEventTime flag indicating if the source should inject a 
{@code MAX_WATERMARK} in the pipeline
 * @param savepointDirectory directory the savepoint should be written to
 * @return a {@link CompletableFuture} containing the path where the savepoint 
is located
 */

CompletableFuture stopWithSavepoint(final JobID jobId, final boolean 
advanceToEndOfEventTime, @Nullable final String savepointDirectory);

















在 2021-01-17 16:48:22,"Congxian Qiu"  写道:
>Hi
>你可以看一下是不是 作业的状态变成了 FINISH 了,现在 Flink 在 FINISH 的时候,会删除 checkpoint(就算设置了
>retain on cancel 也会删除)
>PS :FLINK-18263 正在讨论是否在 Job 变为 FINISH 状态后保留 checkpoint
>[1] https://issues.apache.org/jira/browse/FLINK-18263
>Best,
>Congxian
>
>
>yinghua...@163.com  于2021年1月15日周五 上午11:23写道:
>
>> 感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。
>>
>>
>>
>> yinghua...@163.com
>>
>> 发件人: Yun Tang
>> 发送时间: 2021-01-15 11:02
>> 收件人: user-zh
>> 主题: Re: 回复: 请教个Flink checkpoint的问题
>> Hi
>>
>> 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with
>> savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain
>> checkpoint的数量为1而被subsume掉了,也就是被删掉了。
>>
>> 如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
>>
>> 另外说一句,即使是已经deprecated的cancel with
>> savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10354
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>>
>> 祝好
>> 唐云
>> 
>> From: yinghua...@163.com 
>> Sent: Thursday, January 14, 2021 19:00
>> To: user-zh 
>> Subject: 回复: 回复: 请教个Flink checkpoint的问题
>>
>> 好的,感谢您的回复!
>>
>>
>>
>> yinghua...@163.com
>>
>> 发件人: Evan
>> 发送时间: 2021-01-14 18:48
>> 收件人: user-zh
>> 主题: 回复: 回复: 请教个Flink checkpoint的问题
>> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>>
>> If you choose to retain externalized checkpoints on cancellation you have
>> to handle checkpoint clean up manually when you cancel the job as well
>> (terminating with job status JobStatus#CANCELED).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>>
>> 如回答有误,请指正。
>>
>>
>>
>>
>>
>> 发件人: yinghua...@163.com
>> 发送时间: 2021-01-14 18:02
>> 收件人: user-zh
>> 主题: 回复: 回复: 请教个Flink checkpoint的问题
>> 代码如下:
>> streamEnv.enableCheckpointing(5 * 60 * 1000);
>> CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
>> checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
>> checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
>> checkPointConfig.setMaxConcurrentCheckpoints(1);
>> checkPointConfig.setTolerableCheckpointFailureNumber(3);
>> checkPointConfig
>>
>> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>> String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
>> try {
>>   StateBackend rocksDBStateBackend = new
>> RocksDBStateBackend(checkpointPath);
>>   streamEnv.setStateBackend(rocksDBStateBackend);
>>
>>
>>
>> yinghua...@163.com
>> 发件人: Evan
>> 发送时间: 2021-01-14 17:55
>> 收件人: user-zh
>> 主题: 回复: 请教个Flink checkpoint的问题
>> 代码图挂掉了,看不到代码
>> 发件人: yinghua...@163.com
>> 发送时间: 2021-01-14 17:26
>> 收件人: user-zh
>> 主题: 请教个Flink checkpoint的问题
>>
>> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>>
>> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>> yinghua...@163.com
>>


??????flink??????kafka offset

2021-01-17 文章 ??????
savepoint??flink-kafka-connector?? 
Kafka 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html


| |
??
|
|
18500348...@163.com
|
??
??2021??1??16?? 19:38?? ??
?? flink ??checkpoint,kafka 
offsetstate,checkpointingoffset??kafka,??kafka 
consumer_topic??
groupid??offset,??bakup


checkpoint??-s 
??chkpoint/savepoint??
flink??stateoffset,??kafka??lastest/earliestj
 ??kafka??consumer_topic??topic??


??

Re: Re: 请教个Flink checkpoint的问题

2021-01-17 文章 Congxian Qiu
Hi
你可以看一下是不是 作业的状态变成了 FINISH 了,现在 Flink 在 FINISH 的时候,会删除 checkpoint(就算设置了
retain on cancel 也会删除)
PS :FLINK-18263 正在讨论是否在 Job 变为 FINISH 状态后保留 checkpoint
[1] https://issues.apache.org/jira/browse/FLINK-18263
Best,
Congxian


yinghua...@163.com  于2021年1月15日周五 上午11:23写道:

> 感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。
>
>
>
> yinghua...@163.com
>
> 发件人: Yun Tang
> 发送时间: 2021-01-15 11:02
> 收件人: user-zh
> 主题: Re: 回复: 请教个Flink checkpoint的问题
> Hi
>
> 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with
> savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain
> checkpoint的数量为1而被subsume掉了,也就是被删掉了。
>
> 如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
>
> 另外说一句,即使是已经deprecated的cancel with
> savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10354
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>
> 祝好
> 唐云
> 
> From: yinghua...@163.com 
> Sent: Thursday, January 14, 2021 19:00
> To: user-zh 
> Subject: 回复: 回复: 请教个Flink checkpoint的问题
>
> 好的,感谢您的回复!
>
>
>
> yinghua...@163.com
>
> 发件人: Evan
> 发送时间: 2021-01-14 18:48
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
> If you choose to retain externalized checkpoints on cancellation you have
> to handle checkpoint clean up manually when you cancel the job as well
> (terminating with job status JobStatus#CANCELED).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>
> 如回答有误,请指正。
>
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 18:02
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 代码如下:
> streamEnv.enableCheckpointing(5 * 60 * 1000);
> CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
> checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
> checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
> checkPointConfig.setMaxConcurrentCheckpoints(1);
> checkPointConfig.setTolerableCheckpointFailureNumber(3);
> checkPointConfig
>
> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
> try {
>   StateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointPath);
>   streamEnv.setStateBackend(rocksDBStateBackend);
>
>
>
> yinghua...@163.com
> 发件人: Evan
> 发送时间: 2021-01-14 17:55
> 收件人: user-zh
> 主题: 回复: 请教个Flink checkpoint的问题
> 代码图挂掉了,看不到代码
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> yinghua...@163.com
>