Re: Re: flink1.14 注册mysql connector报错

2022-02-24 Thread xiaoyue
好的,成功入库,非常感谢您!



xiao...@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:57
收件人: user-zh
主题: Re: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。
 
best regards,
 
xiaoyue  於 2022年2月25日 週五 下午2:36寫道:
 
> Hi tony,
>完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
> # 执行环境
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> # hive源
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> String confSite = "src\\main\\resources";
>
> String version = "3.1.2";
>
> String defaultDatabase = "fund_analysis";
>
> HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
> tEnv.registerCatalog("hive", hiveCat);
>
> tEnv.useCatalog("hive");
> # hive 取数SQL
> String biz_date = "20211130";
> String tblSource = String.format("select " +
> "coalesce(a.rate,0) as yldrate, " +
> "coalesce(c.rate,0) as riskless_yldrate, " +
> "a.ccy_type, " +
> "a.biz_date, " +
> "b.is_exch_dt, " +
> "a.pf_id " +
> "from " +
> "ts_pf_yldrate a " +
> "inner join td_gl_day b on b.dt = a.biz_date " +
> "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
> "where a.biz_date <= '%s'", biz_date);
> Table table = tEnv.sqlQuery(tblSource);
>
> // 注册flatmap函数
> tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
> // 注册聚合函数
> tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
> // 执行flatmap操作
> Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
> $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
> $("is_exch_dt"),$("pf_id"), biz_date));
>
>  // 切换catalog,并注册表
> tEnv.useCatalog("default_catalog");
> tEnv.createTemporaryView("tagTable",tagTbl);
>
> // 调用函数SharpeRatioAgg 计算结果
>  Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
> // result.execute().print(); (--> 该步 result 可成功打印)
>
> // 下沉操作
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string" +
> ") with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'xxx', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
> tEnv.execute("mysql_sink_test");
>
>
> xiao...@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string, " +
> "  primary key(dmo_index_code) not enforced) " +
> "  with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'yss300377@ZT', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' =
> 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink).print();
> //tEnv.execute("mysql_sink_test");
> }
>
> 輸出的結果為:
> ++
> | result |
> ++
> | OK |
> ++
> 1 row in set
>
> best regards,
>
> xiaoyue  於 2022年2月25日 週五 下午1:37寫道:
>
> > flink1.14  

streaming mode with both finite and infinite input sources

2022-02-24 Thread Jin Yi
so we have a streaming job where the main work to be done is processing
infinite kafka sources.  recently, i added a fromCollection (finite) source
to simply write some state once upon startup.  this all seems to work
fine.  the finite source operators all finish, while all the infinite
source operators continue running with watermarks.

however, the weird thing is that savepointing throws exceptions, and there
have been no automatic checkpoints at all while the job has been running
for 90+minutes (checkpoint config is unaligned, exactly once every 5m w/ a
1h timeout).

is this mixed finity not a supported setup?


Re:回复:hive 进行 overwrite 合并数据后文件变大?

2022-02-24 Thread RS
感谢,确定了下是压缩格式的问题,
原hive文件的压缩是SNAPPY压缩,使用Flink SQL合并小文件之后,默认不压缩,导致文件变大了。
Flink默认没有继承原文件的压缩算法。。。




在 2022-02-22 12:08:39,"‪junjie.m...@goupwith.com‬"  
写道:

检查下数据格式和压缩格式是否和之前的不一致


 原始邮件 
发件人: RS 
日期: 2022年2月22日周二 09:35
收件人: user-zh@flink.apache.org
主 题: hive 进行 overwrite 合并数据后文件变大?
Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M  822.1 M  /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M   144.2 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G8.7 G/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M   142.9 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



Re: Re: flink1.14 注册mysql connector报错

2022-02-24 Thread Tony Wei
Hi xiaoyue,

看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。

best regards,

xiaoyue  於 2022年2月25日 週五 下午2:36寫道:

> Hi tony,
>完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
> # 执行环境
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> # hive源
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> String confSite = "src\\main\\resources";
>
> String version = "3.1.2";
>
> String defaultDatabase = "fund_analysis";
>
> HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
> tEnv.registerCatalog("hive", hiveCat);
>
> tEnv.useCatalog("hive");
> # hive 取数SQL
> String biz_date = "20211130";
> String tblSource = String.format("select " +
> "coalesce(a.rate,0) as yldrate, " +
> "coalesce(c.rate,0) as riskless_yldrate, " +
> "a.ccy_type, " +
> "a.biz_date, " +
> "b.is_exch_dt, " +
> "a.pf_id " +
> "from " +
> "ts_pf_yldrate a " +
> "inner join td_gl_day b on b.dt = a.biz_date " +
> "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
> "where a.biz_date <= '%s'", biz_date);
> Table table = tEnv.sqlQuery(tblSource);
>
> // 注册flatmap函数
> tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
> // 注册聚合函数
> tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
> // 执行flatmap操作
> Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
> $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
> $("is_exch_dt"),$("pf_id"), biz_date));
>
>  // 切换catalog,并注册表
> tEnv.useCatalog("default_catalog");
> tEnv.createTemporaryView("tagTable",tagTbl);
>
> // 调用函数SharpeRatioAgg 计算结果
>  Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
> // result.execute().print(); (--> 该步 result 可成功打印)
>
> // 下沉操作
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string" +
> ") with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'xxx', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
> tEnv.execute("mysql_sink_test");
>
>
> xiao...@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string, " +
> "  primary key(dmo_index_code) not enforced) " +
> "  with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'yss300377@ZT', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' =
> 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink).print();
> //tEnv.execute("mysql_sink_test");
> }
>
> 輸出的結果為:
> ++
> | result |
> ++
> | OK |
> ++
> 1 row in set
>
> best regards,
>
> xiaoyue  於 2022年2月25日 週五 下午1:37寫道:
>
> > flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
> >
> > 代码:
> > env = StreamExecutionEnvironment.getExecutionEnvironment();
> > 

Re: Re: flink1.14 注册mysql connector报错

2022-02-24 Thread xiaoyue
Hi tony,
   完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf 
function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。

代码:
# 执行环境
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = StreamTableEnvironment.create(env, Settings);

# hive源
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

String confSite = "src\\main\\resources";

String version = "3.1.2";

String defaultDatabase = "fund_analysis";

HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, 
confSite, confSite, version);

tEnv.registerCatalog("hive", hiveCat);

tEnv.useCatalog("hive");
# hive 取数SQL
String biz_date = "20211130";
String tblSource = String.format("select " +
"coalesce(a.rate,0) as yldrate, " +
"coalesce(c.rate,0) as riskless_yldrate, " +
"a.ccy_type, " +
"a.biz_date, " +
"b.is_exch_dt, " +
"a.pf_id " +
"from " +
"ts_pf_yldrate a " +
"inner join td_gl_day b on b.dt = a.biz_date " +
"inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and 
c.pf_id = a.pf_id " +
"where a.biz_date <= '%s'", biz_date);
Table table = tEnv.sqlQuery(tblSource);

// 注册flatmap函数
tEnv.createTemporarySystemFunction("RowFlatMap", 
SharpeRatioFlatMap.class);
// 注册聚合函数
tEnv.createTemporarySystemFunction("SharpeRatioAgg", 
SharpeRatioAggregate.class);

// 执行flatmap操作
Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
$("riskless_yldrate"),$("ccy_type"),$("biz_date"),
$("is_exch_dt"),$("pf_id"), biz_date));
   
 // 切换catalog,并注册表
tEnv.useCatalog("default_catalog");
tEnv.createTemporaryView("tagTable",tagTbl);

// 调用函数SharpeRatioAgg 计算结果
 Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, 
dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as 
index_value from tagTable group by dmo_index_code", biz_date));
// result.execute().print(); (--> 该步 result 可成功打印)

// 下沉操作
String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string" +
") with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'xxx', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink);


result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
tEnv.execute("mysql_sink_test");


xiao...@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:13
收件人: user-zh
主题: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
 
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);
 
String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string, " +
"  primary key(dmo_index_code) not enforced) " +
"  with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'yss300377@ZT', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink).print();
//tEnv.execute("mysql_sink_test");
}
 
輸出的結果為:
++
| result |
++
| OK |
++
1 row in set
 
best regards,
 
xiaoyue  於 2022年2月25日 週五 下午1:37寫道:
 
> flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
>String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string, " +
> "  

pyflink object to java object

2022-02-24 Thread Francis Conroy
Hi all,

we're using pyflink for most of our flink work and are sometimes into a
java process function.
Our new java process function takes an argument in in the constructor which
is a Row containing default values. I've declared my Row in pyflink like
this:

default_row = Row(ep_uuid="",
  unit_uuid=None,
  unit_longitude=None,
  unit_latitude=None,
  unit_state=None,
  unit_country=None,
  pf_uuid=None,
  pf_name=None)

row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
   Types.STRING(),  # unit_uuid
   Types.DOUBLE(),  # unit_longitude
   Types.DOUBLE(),  # unit_latitude
   Types.STRING(),  # unit_state
   Types.STRING(),  # unit_country
   Types.STRING(),  # pf_uuid
   Types.STRING()  # pf_name
   ])

I'm now trying to get a handle to a java row object in the jvm so I can
pass that into the process function's constructor.

endpoint_info_enriched_stream =
DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
jvm.org.switchdin.operators.TableEnrich(j_obj)))

I've tried a few approaches, but I really can't figure out how to do this,
I'm not sure what I need on each side for this, a coder, serializer,
pickler?

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: [Flink-1.14.3] Restart of pod due to duplicatejob submission

2022-02-24 Thread Yang Wang
This might be related with FLINK-21928 and seems already fixed in 1.14.0.
But it will have some limitations and users need to manually clean up the
HA entries.


Best,
Yang

Parag Somani  于2022年2月24日周四 13:42写道:

> Hello,
>
> Recently due to log4j vulnerabilities, we have upgraded to Apache Flink
> 1.14.3. What we observed we are getting following exception, and because of
> it pod gets in crashloopback. We have seen this issues esp. during the time
> of upgrade or deployment time when existing pod is already running.
>
> What would it be causing this issue during deployment time? Any assistance
> as a workaround would be much appreciated.
>
> Also, i am seeing this issue only after upgrade from 1.14.2 to 1.14.3 .
>
> Env:
> Deployed on : k8s
> Flink version: 1.14.3
> HA using zookeeper
>
> Logs:
> 2022-02-23 05:13:14.555 ERROR 45 --- [t-dispatcher-17]
> c.b.a.his.service.FlinkExecutorService   : Failed to execute job
>
> org.apache.flink.util.FlinkException: Failed to execute job 'events rates
> calculation'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2056)
> ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
> ~[flink-clients_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> ~[flink-clients_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
> ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
> at
> com.bmc.ade.his.service.FlinkExecutorService.init(FlinkExecutorService.java:37)
> ~[health-service-1.0.00.jar:1.0.00]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[na:na]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[na:na]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:na]
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> ~[na:na]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:602)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
> ~[spring-beans-5.3.4.jar:5.3.4]
> at
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:917)
> ~[spring-context-5.3.4.jar:5.3.4]
> at
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:582)
> ~[spring-context-5.3.4.jar:5.3.4]
> at
> org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
> ~[spring-boot-2.5.5.jar:2.5.5]
> at
> 

Re: flink1.14 注册mysql connector报错

2022-02-24 Thread Tony Wei
Hi xiaoyue,

請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。

public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);

String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string, " +
"  primary key(dmo_index_code) not enforced) " +
"  with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'yss300377@ZT', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink).print();
//tEnv.execute("mysql_sink_test");
}

輸出的結果為:
++
| result |
++
| OK |
++
1 row in set

best regards,

xiaoyue  於 2022年2月25日 週五 下午1:37寫道:

> flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
>String mysqlSink = "create table bulk_index_sink(" +
> "  biz_date string, " +
> "  dmo_index_code string, " +
> "  index_value string, " +
> "  primary key(dmo_index_code) not enforced) " +
> "  with (" +
> "   'connector' = 'jdbc', " +
> "   'username' = 'root', " +
> "   'password' = 'yss300377@ZT', " +
> "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> "   'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
> "   'table-name' = 'bulk_index_sink')";
>  tEnv.executeSql(mysqlSink);
>  tEnv.execute("mysql_sink_test");
>
> 报错:
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
> "DISABLE" ...
> "ENABLE" ...
> "NORELY" ...
> "NOVALIDATE" ...
> "RELY" ...
> "VALIDATE" ...
> ")" ...
> "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiao...@ysstech.com
>


flink1.14 注册mysql connector报错

2022-02-24 Thread xiaoyue
flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!

代码:
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = StreamTableEnvironment.create(env, Settings);

   String mysqlSink = "create table bulk_index_sink(" +
"  biz_date string, " +
"  dmo_index_code string, " +
"  index_value string, " +
"  primary key(dmo_index_code) not enforced) " +
"  with (" +
"   'connector' = 'jdbc', " +
"   'username' = 'root', " +
"   'password' = 'yss300377@ZT', " +
"   'driver' = 'com.mysql.cj.jdbc.Driver', " +
"   'url' = 
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
"   'table-name' = 'bulk_index_sink')";   
 tEnv.executeSql(mysqlSink);
 tEnv.execute("mysql_sink_test");

报错:
org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "not" at line 1, column 126.
Was expecting one of:
"DISABLE" ...
"ENABLE" ...
"NORELY" ...
"NOVALIDATE" ...
"RELY" ...
"VALIDATE" ...
")" ...
"," ...


at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at 
com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "not" 
at line 1, column 126.



xiao...@ysstech.com


Possible BUG in 1.15 SQL JSON_OBJECT()

2022-02-24 Thread Jonathan Weaver
Using the latest SNAPSHOT BUILD.

If I have a column definition as

 .column(
"events",
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("status",
DataTypes.STRING().notNull()),
DataTypes.FIELD("timestamp",
DataTypes.STRING().notNull()),
DataTypes.FIELD("increment_identifier",
DataTypes.STRING().nullable()

And a query as

JSON_OBJECT('events' VALUE events) event_json

Will generate JSON correctly ONLY if increment_identifier is NOT NULL but
will throw a NullPointerException on the first record that has that column
as null.

Exception is not helpful.

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: 

Re: Flink job recovery after task manager failure

2022-02-24 Thread Zhilong Hong
Hi, Afek

I've read the log you provided. Since you've set the value of
restart-strategy to be exponential-delay and the value
of restart-strategy.exponential-delay.initial-backoff is 10s, everytime a
failover is triggered, the JobManager will have to wait for 10 seconds
before it restarts the job.If you'd prefer a quicker restart, you could
change the restart strategy to fixed-delay and set a small value for
restart-strategy.fixed-delay.delay.

Furthermore, there are two more failovers that happened during the
initialization of recovered tasks. During the initialization of a task, it
will try to recover the states from the last valid checkpoint. A
FileNotFound exception happens during the recovery process. I'm not quite
sure the reason. Since the recovery succeeds after two failovers, I think
maybe it's because the local disks of your cluster are not stable.

Sincerely,
Zhilong

On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) <
ifat.a...@nokia.com> wrote:

> Thanks Zhilong.
>
>
>
> The first launch of our job is fast, I don’t think that’s the issue. I see
> in flink job manager log that there were several exceptions during the
> restart, and the task manager was restarted a few times until it was
> stabilized.
>
>
>
> You can find the log here:
>
> jobmanager-log.txt.gz
> 
>
>
>
> Thanks,
>
> Ifat
>
>
>
> *From: *Zhilong Hong 
> *Date: *Wednesday, 23 February 2022 at 19:38
> *To: *"Afek, Ifat (Nokia - IL/Kfar Sava)" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink job recovery after task manager failure
>
>
>
> Hi, Afek!
>
>
>
> When a TaskManager is killed, JobManager will not be acknowledged until a
> heartbeat timeout happens. Currently, the default value of
> heartbeat.timeout is 50 seconds [1]. That's why it takes more than 30
> seconds for Flink to trigger a failover. If you'd like to shorten the time
> a failover is triggered in this situation, you could decrease the value of
> heartbeat.timeout in flink-conf.yaml. However, if the value is set too
> small, heartbeat timeouts will happen more frequently and the cluster will
> be unstable. As FLINK-23403 [2] mentions, if you are using Flink 1.14 or
> 1.15, you could try to set the value to 10s.
>
>
>
> You mentioned that it takes 5-6 minutes to restart the jobs. It seems a
> bit weird. How long does it take to deploy your job for a brand new launch?
> You could compact and upload the log of JobManager to Google Drive or
> OneDrive and attach the sharing link. Maybe we can find out what happens
> via the log.
>
>
>
> Sincerely,
>
> Zhilong
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
>
> [2] https://issues.apache.org/jira/browse/FLINK-23403
>
>
>
> On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
> ifat.a...@nokia.com> wrote:
>
> Hi,
>
>
>
> I am trying to use Flink checkpoints solution in order to support task
> manager recovery.
>
> I’m running flink using beam with filesystem storage and the following
> parameters:
>
> checkpointingInterval=3
>
> checkpointingMode=EXACTLY_ONCE.
>
>
>
> What I see is that if I kill a task manager pod, it takes flink about 30
> seconds to identify the failure and another 5-6 minutes to restart the jobs.
>
> Is there a way to shorten the downtime? What is an expected downtime in
> case the task manager is killed, until the jobs are recovered? Are there
> any best practices for handling it? (e.g. different configuration
> parameters)
>
>
>
> Thanks,
>
> Ifat
>
>
>
>


Re: Flink数据写入HDFS

2022-02-24 Thread wenjie li
1. 比较简单的思路是可以吧hdfs文件弄成hive表,然后使用
类似下面sql :
set hive.merge.mapredfiles = true
//在Map-Reduce的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000
//合并文件的大小
set hive.merge.smallfiles.avgsize=1600

insert overwrite table_name
select
*
from table_name1
2. 直接通过spark的coalesce()方法和repartition()方法 来进行合并小文件
具体大概上面两种思路: 然后把上面的任务配置一个定时任务定期合并hdfs小文件即可;
具体你可以按照我上面的两种思路进行上网搜索有具体的实现。


Tony <1298877...@qq.com.invalid> 于2022年2月24日周四 15:16写道:

> 请问合并小文件的定时任务,是个啥思路,或者有参考的资料吗
>
>
>
> --原始邮件--
> 发件人: "wenjie li" 发送时间: 2022年2月24日(星期四) 下午2:09
> 收件人: "user-zh" 主题: Re: Flink数据写入HDFS
>
>
>
> 1.使用BucketingSink写入HDFS可以配置滚动策略来决定写文件的大小
> 2.如果由于写入频率大和输出数据量比较小的情况第一种方案不是很好,可以考虑在后面另外启动一个合并小文件的定时任务。
>
> Tony<1298877...@qq.com.invalid于2022年2月24日周四12:10写道:
>
> Flink数据写入HDFS,如何解决小文件问题?
> FlinkSQL有小文件合并策略,FlinkdataStream写入HDFS,如何解决?


Re: Flink Statefun and Feature computation

2022-02-24 Thread Igal Shilman
Hello,

For (1) I welcome you to visit our documentions, and many talks online to
understand more about the motivation and the value of StateFun. I can say
in a nutshell that StateFun provides few building blocks that makes
building distributed stateful applications easier.

For (2) checkout our playground repository to see how storage is
configured. It is completely defined by the SDK and is not configured by
Flink cluster configuration.

I think that the use case you are describing is a good fit for StateFun. If
you check out the latest Flink Forward's videos there were few that
described how to use
StateFun for exactly that[3].

Good luck!
Igal

[1] https://nightlies.apache.org/flink/flink-statefun-docs-stable/
[2] https://github.com/apache/flink-statefun-playground
[3] https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/videos

On Sun, Feb 20, 2022 at 1:54 PM Federico D'Ambrosio 
wrote:

> Hello everyone,
>
> It's been quite a while since I wrote to the Flink ML, because in my
> current job never actually arose the need for a stateful stream processing
> system, until now.
>
> Since the last version I actually tried was Flink 1.9, well before
> Stateful Functions, I had a few questions about some of the latest features.
>
> 1. What are the use cases for which Flink Statefuns were thought of? As
> far as I understand from the documentation, they are basically processors
> that can be separated from a "main" Flink streaming job (and can be
> integrated with), but I fail to grasp how they should differ from a rest
> endpoint implemented using any other framework.
> 2. How is the storage for these functions configured? I see that the
> storage for the state is accessed via a Context object, so I think it is
> configured by a Flink cluster configuration?
>
> I would like, then, to elaborate on my use case: we have some 20 CDC
> topics (1 topic per table) on Kafka. Upon the data streamed on these
> topics, we need to compute many features to be used by a ML model. Many of
> these features need to be computed by joining multiple topics and/or need
> the whole history of the field. So, I was wondering if Stateful Functions
> could be a good approach to this problem, where a feature could be
> "packaged" in a single stateful function to be "triggered" by the arrival
> of any new message on the topic configured as its ingress.
>
> So, basically, I'm wondering if they could fit the use case, or we're
> better off with a custom flink job.
>
> Thank you for your time,
> --
> Federico D'Ambrosio
>


Re: Flink job recovery after task manager failure

2022-02-24 Thread Afek, Ifat (Nokia - IL/Kfar Sava)
Thanks Zhilong.

The first launch of our job is fast, I don’t think that’s the issue. I see in 
flink job manager log that there were several exceptions during the restart, 
and the task manager was restarted a few times until it was stabilized.

You can find the log here:
jobmanager-log.txt.gz

Thanks,
Ifat

From: Zhilong Hong 
Date: Wednesday, 23 February 2022 at 19:38
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" 
Cc: "user@flink.apache.org" 
Subject: Re: Flink job recovery after task manager failure

Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a 
heartbeat timeout happens. Currently, the default value of heartbeat.timeout is 
50 seconds [1]. That's why it takes more than 30 seconds for Flink to trigger a 
failover. If you'd like to shorten the time a failover is triggered in this 
situation, you could decrease the value of heartbeat.timeout in 
flink-conf.yaml. However, if the value is set too small, heartbeat timeouts 
will happen more frequently and the cluster will be unstable. As FLINK-23403 
[2] mentions, if you are using Flink 1.14 or 1.15, you could try to set the 
value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit 
weird. How long does it take to deploy your job for a brand new launch? You 
could compact and upload the log of JobManager to Google Drive or OneDrive and 
attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) 
mailto:ifat.a...@nokia.com>> wrote:
Hi,

I am trying to use Flink checkpoints solution in order to support task manager 
recovery.
I’m running flink using beam with filesystem storage and the following 
parameters:
checkpointingInterval=3
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 
seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case 
the task manager is killed, until the jobs are recovered? Are there any best 
practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat



Re: Flink metrics via permethous or opentelemerty

2022-02-24 Thread Nicolaus Weidner
Hi Sigalit,

first of all, have you read the docs page on metrics [1], and in particular
the Prometheus section on metrics reporters [2]?
Apart from that, there is also a (somewhat older) blog post about
integrating Flink with Prometheus, including a link to a repo with example
code [3].

Hope that helps to get you started!
Best,
Nico

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#metrics
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters/#prometheus
[3] https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html

On Wed, Feb 23, 2022 at 8:42 AM Sigalit Eliazov  wrote:

> Hello. I am looking for a way to expose flink metrics via opentelemerty to
> the gcp could monitoring dashboard.
> Does anyone has experience with that?
>
> If it is not directly possible we thought about using permethous as a
> middlewere.  If you have experience with that i would appreciate any
> guidance.
>
> Thanks
>


Re: java.lang.Exception: Job leader for job id 0efd8681eda64b072b72baef58722bc0 lost leadership.

2022-02-24 Thread Nicolaus Weidner
Hi Jai,

Do writes to ValueStates/MapStates have a direct on churn of the Flink
> State or is the data buffered in between?
>

Writes to keyed state go directly to RocksDB. So there shouldn't be any
memory issues with buffers overflowing or similar. In general, more memory
should increase performance (larger cache sizes before having to write to
disk), but less memory shouldn't cause crashes.

Since the errors you encountered are not that specific, can you provide
full logs surrounding such incidents? There is not much to go on without
further info.

Best,
Nico

>


Re: DataStream API: Parquet File Format with Scala Case Classes

2022-02-24 Thread Fabian Paul
Hi Ryan,

I guess the ticket you are looking for is the following [1]. AFAIK the work
on it hasn't started yet. So we are still appreciating initial designs or
ideas.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-25416

On Tue, Feb 22, 2022 at 11:54 PM Ryan van Huuksloot <
ryan.vanhuuksl...@shopify.com> wrote:

> Hi Fabian,
>
> Thanks for the response! I'll take a look at the CSVReaderFormat.
>
> Our team is interested in contributing to Parquet. However, our capacity
> for the current sprint is fully committed to other workstreams. I'll put
> this issue onto the backlog and see how it stacks against our internal
> priorities over the next few cycles.
> I did a scan for a JIRA issue for this file format restructure and didn't
> find anything but do you know of a JIRA issue I can subscribe to for this
> issue? Otherwise, I can create an issue for this change with Parquet.
>
> In regards to the "envisioned setup".
>
>> My understanding so far is you have Parquet files with backfill
>> data and want to read all files and then continue the reading from
>> Kafka. Is that correct?
>>
> This is correct, the only modification would be that we want the final
> datastream type to be DataStream[T], where T is a Scala Case Class. The
> user would provide T to the Hybrid Source at time of instantiation. So
> pseudocode would look roughly like:
>
> long switchTimestamp = ...; // derive from file input paths
>> FileSource fileSource =
>> FileSource.forBulkFileFormat(new ParquetColumnarRowInputFormat(),
>> Path.fromLocalFile(testDir)).build(); // Swap ParquetColumnarRowInputFormat
>> for a Generic ParquetInputFormat
>> KafkaSource kafkaSource = KafkaSource.builder()
>> .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
>> .build();
>> HybridSource hybridSource = HybridSource.builder(fileSource)
>> .addSource(kafkaSource)
>> .build();
>
> DataStream dataStream = env.fromSource(hybridSource, watermarkStrategy,
>> name)
>>
>
> Let me know if you have any questions!
>
> Thanks,
> Ryan van Huuksloot
> Data Developer | Data Platform Engineering | Streaming Capabilities
> [image: Shopify]
> 
>
>
> On Mon, Feb 21, 2022 at 3:16 AM Fabian Paul  wrote:
>
>> Hi Ryan,
>>
>> Thanks for bringing up this topic. Currently, your analysis is
>> correct, and reading parquet files outside the Table API is rather
>> difficult. The community started an effort in Flink 1.15 to
>> restructure some of the formats to make them better applicable to the
>> DataStream and Table API. You can have a look a the CSV format
>> implementation[1]. Obviously, implementing the Parquet format is more
>> complicated since it is more performance-sensitive.
>>
>> If you are willing to work on it, that would be great. We can also
>> assist with the design and offer guidance during the implementation.
>>
>> One question I'd still like to ask is about your exact envisioned
>> setup. My understanding so far is you have Parquet files with backfill
>> data and want to read all files and then continue the reading from
>> Kafka. Is that correct?
>>
>> Best
>> Fabian
>>
>> [1]
>> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java#L71
>>
>>
>> On Fri, Feb 18, 2022 at 11:22 PM Ryan van Huuksloot
>>  wrote:
>> >
>> > Hello,
>> >
>> >
>> > Context:
>> >
>> > We are working on integrating Hybrid Sources with different Sources and
>> Sinks. I have been working on a Parquet source that allows users to load
>> the FileSource[T] so that the source can be used within Hybrid Sources
>> where the HybridSource is of Type[T].
>> >
>> > The environment is Scala 2.12 and we are using the DataStream API. The
>> generic type “T” used in the email would be a Scala case class.
>> >
>> >
>> > Problem:
>> >
>> > Based on the documentation, it is recommended that you use the
>> ParquetColumnarRowInputFormat as an entrypoint to set up the Source. Given
>> that ParquetColumnarRowInputFormat hard codes RowData, your other sources
>> would then need to be of Type[RowData] to be used in HybridSource - from my
>> experience - and you can’t convert FileSource[RowData] -> FileSource[T].
>> >
>> > An alternative I looked into was extending ParquetVectorizedInputFormat
>> but found that the type restrictions were difficult to reconcile.
>> >
>> >
>> > Potential Solution:
>> >
>> > Create a better AbstractParquetBulkFormat, similar to the
>> AbstractAvroBulkFormat added in 1.15. We would be available to contribute
>> but want to understand if this is a direction Flink is willing to go before
>> putting in the work!
>> >
>> >
>> > Questions:
>> >
>> >
>> > Based on the current implementation of Parquet within Flink, is it
>> correct to say that the only entry-point for parquet is
>> ParquetColumnarRowInputFormat?
>> >
>> > Is there any way to convert a FileSource[RowData] -> FileSource[T]?

reserved key percentile_cont

2022-02-24 Thread ZHANG YU
hi
我是flink新手,想问下reserved keyword 
percentile_cont和percentile_disc在可以预见的版本会实现吗?在此之前大家是怎么处理这个计算需求的呀?

获取 Outlook for iOS