Re:flink的高可用配置

2020-06-18 文章 chaojianok
很奇怪,我打开这封邮件什么内容都没有。
Flink 高可以用配置推荐看下这两篇文档:
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html



















At 2020-06-19 11:09:13, "Tony"  wrote:


Re:Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-18 文章 chaojianok
+1 to support HBase 2.x
And I think the 1.4.x version can be retained for the time being, so that users 
who are currently using the 1.4.x version can have more time to evaluate 
whether their projects need to be upgraded and the cost of upgrading.
















At 2020-06-19 12:35:36, "Jark Wu"  wrote:

+1 to support HBase 2.x

But not sure about dropping support for 1.4.x


I cc'ed to user@ and user-zh@ to hear more feedback from users. 

Best,
Jark


On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:

Hi All!

I would like to revive an old ticket
 and discussion around
upgrading the HBase version of the connector.

The current HBase version is 1.4.3 which is over 2 years old at this point
and incompatible with the newer HBase versions used at many companies.

We propose to upgrade the connector to the latest version and drop support
for the old version starting from the 1.12 Flink release. This would help
us maintain and improve the HBase connector over time.

If the community agrees we are happy to contribute this upgrade as we have
already developed and tested the updated version.

Cheers,
Gyula


Flink SQL 1.10.0窗口计算结果无法sink

2020-06-18 文章 王超
各位大神求帮忙看一下。

Flink 版本:1.10.0
Planner:blink

我在使用Flink SQL的时候遇到了一个问题,能否帮忙看一下,我尝试在寻找了解决方法,但是没有起作用。
比如我发现类似的问题
https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html
中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决。


Flink Table Env配置
*StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();*
*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*
*env.setParallelism(1);*
*EnvironmentSettings envSetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  *
*StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
envSetting);*
*tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));  *


我这个job应用中定义了两个table,分别为source table “sqlDdlAnaTable”

*String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, datatime
BIGINT, list ARRAY , ts AS
TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - INTERVAL
'5' SECOND)" +*
*" WITH (" +*
*"'connector.type' = 'pravega'," +*
*"'connector.version' = '1'," +*
*"'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090 '," +*
*"'connector.connection-config.default-scope' = 'Demo'," +*
*"'connector.reader.stream-info.0.stream' = 'test'," +*
*"'format.type' = 'json'," +*
*"'format.fail-on-missing-field' = 'false', " +*
*"'update-mode' = 'append')";*

和sink table " sqlDdlSinkTable ".

* String sqlDdlSinkTable = "CREATE TABLE tb_sink" +*
*"(id STRING, " +*
*"wStart TIMESTAMP(3) , " +*
*"v FLOAT)" +*
*" WITH (" +*
*"'connector.type' = 'pravega'," +*
*"'connector.version' = '1'," +*
*"'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090 '," +*
*"'connector.connection-config.default-scope' = 'Demo'," +*
*"'connector.writer.stream' = 'result'," +*
*"'connector.writer.routingkey-field-name' = 'id'," +*
*"'connector.writer.mode' = 'atleast_once'," +*
*"'format.type' = 'json'," +*
*"'update-mode' = 'append')";*

在数据处理逻辑比较简单,计算10s tumble window的vaule的平均。
我一开始直接打印结果能够明确看到10s中输出一次计算结果,watermark也正常移动。
*String sqlAna = "SELECT ts, id, v " +*
*"FROM tb_JsonRecord " +*
*"WHERE q=1 AND type=1";*
*Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);*
*tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);*

*tableEnv.toAppendStream(tableAnaRecord, Row.class).print()*



但是我尝试将结果insert到sink table中发现,就没有任何结果被写入。
*String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +*
*"SELECT id, " +*
*"TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +*
*"AVG(v) FROM tb_AnaRecord " +*
*"GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";   *
* tableEnv.sqlUpdate(sqlAnaAvg);*


提前感谢!

BR//Chao


Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-18 文章 Jark Wu
+1 to support HBase 2.x
But not sure about dropping support for 1.4.x

I cc'ed to user@ and user-zh@ to hear more feedback from users.

Best,
Jark

On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:

> Hi All!
>
> I would like to revive an old ticket
>  and discussion around
> upgrading the HBase version of the connector.
>
> The current HBase version is 1.4.3 which is over 2 years old at this point
> and incompatible with the newer HBase versions used at many companies.
>
> We propose to upgrade the connector to the latest version and drop support
> for the old version starting from the 1.12 Flink release. This would help
> us maintain and improve the HBase connector over time.
>
> If the community agrees we are happy to contribute this upgrade as we have
> already developed and tested the updated version.
>
> Cheers,
> Gyula
>


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-18 文章 wangweigu...@stevegame.cn

  可以通过异步的方式(RichAsyncFunction)进行维表关联操作,异步多线程方式进行维表读取!



 
发件人: Jim Chen
发送时间: 2020-06-19 10:34
收件人: user-zh
主题: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性
请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢
 
Jark Wu  于2020年6月17日周三 上午12:11写道:
 
> 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
>
> 当前版本下的话,可以尝试 keyby+localcache+异步IO。
>
> Best,
> Jark
>
> On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
>
> > 或者采用redis做维表存储介质。
> >
> > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > >
> > > hi,大家
> > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> >
>


Re: flink 1.10 on yarn 内存超用,被kill

2020-06-18 文章 Yun Tang
Hi

单个Slot的managed memory是多少(可以通过webUI或者TM的日志观察到),rocksDB的 block cache 
usage会增长到多少,是一直在增长最终超过单个slot的managed memory么?

RocksDB的内存托管在绝大部分场景下是work的,但是RocksDB本身的实现限制了这个功能完美发挥作用。具体涉及到LRUcache和Writebuffer
 manager之间的对应关系,目前RocksDB的strict cache limit和将write buffer 
manager的内存申请“托管”到cache的功能是不完整的,即使在cache达到capacity的情况下,仍然可以申请内存并插入,所以存在小概率的情况会出现内存超用。

想要绕开这个问题,可以先增大TM的process内存,来增大overhead 内存 [1],可以给RocksDB提供一定的buffer。

从RocksDB的角度的话,增大flush线程数以及降低arena 的size可以降低该问题出现概率。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

祝好
唐云



From: 1017517291 <1017517...@qq.com>
Sent: Friday, June 19, 2020 10:31
To: user-zh 
Subject: flink 1.10 on yarn 内存超用,被kill

Hi,
  社区的大大们好,请教个问题,我最近一个作业在线上运行,运行一天左右就被yarn kill掉了,提示超过内存限制,打印日志如下:


java.lang.Exception: [2020-06-19 00:33:57.249]Container 
[pid=771992,containerID=container_e05_1592035979967_0057_01_06] is running 
745472B beyond the 'PHYSICAL' memo
ry limit. Current usage: 10.0 GB of 10 GB physical memory used; 19.1 GB of 21 
GB virtual memory used. Killing container.



我的flink版本是1.10,分配的单TaskManager内存为10g, 
查看监控,JVM堆内存一直比较平稳,怀疑是RocksDB使用的存储比较大,打开了RocksDB的block usage监控,确实一直在增长;
然后我调整了taskmanager.memory.managed.fraction为:0.7,但是还是block usage一直增长,最终被kill掉;
我已经开启了state.backend.rocksdb.memory.managed = 
true,按理说,RocksDB托管内存不会一直增长,看1.10官方文档介绍,RocksDB的使用内存会被控制在托管内存之内,但是我的作业使用的block 
usage一直在增长,最终导致被容器kill掉;


想问下
1. 什么情况下RocksDB的内存不受控制,一直增长,超过分配的managedmemory
2. 在1.10版本中,还有什么情况,会导致内存超过限制,被yarn kill掉


期待各位大佬回复


flink的高可用配置

2020-06-18 文章 Tony


Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-18 文章 Jim Chen
请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢

Jark Wu  于2020年6月17日周三 上午12:11写道:

> 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
>
> 当前版本下的话,可以尝试 keyby+localcache+异步IO。
>
> Best,
> Jark
>
> On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
>
> > 或者采用redis做维表存储介质。
> >
> > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > >
> > > hi,大家
> > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> >
>


flink 1.10 on yarn ????????????kill

2020-06-18 文章 1017517291
Hi??
  
??yarn 
kill??


java.lang.Exception: [2020-06-19 00:33:57.249]Container 
[pid=771992,containerID=container_e05_1592035979967_0057_01_06] is running 
745472B beyond the 'PHYSICAL' memo
ry limit. Current usage: 10.0 GB of 10 GB physical memory used; 19.1 GB of 21 
GB virtual memory used. Killing container.



flink??1.10??TaskManager??10g, 
??JVM??RocksDBRocksDB??block
 usage??
taskmanager.memory.managed.fraction0.7??block 
usagekill
state.backend.rocksdb.memory.managed = 
true??RocksDB1.10??RocksDBblock
 usage??kill


??
1. ??RocksDBmanagedmemory
2. ??1.10yarn kill??




pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 文章 jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?


场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)


如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:


  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")\
  .filter("logType=alarm")\
  .insert_into("sink2")
请各位大牛指点,感谢







Re:Re: 项目引用flink-1.11.0,打包失败

2020-06-18 文章 Zhou Zach
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment

















在 2020-06-18 19:41:08,"Jark Wu"  写道:
>能贴下完整代码吗? (imports 部分)
>
>Best,
>Jark
>
>On Thu, 18 Jun 2020 at 19:18, Zhou Zach  wrote:
>
>>
>>
>> flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
>> idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij
>> Idea Build可以成功,就是打包的时候出错。。
>>
>>
>>
>>
>> [ERROR]
>> /Users/Zach/flink-common_1.11.0/src/main/scala/org/rabbit/sql/FromKafkaSinkJdbcForUserUV.scala:7:
>> error: object StreamTableEnvironment is not a member of package
>> org.apache.flink.table.api.bridge.java
>> [ERROR] import
>> org.apache.flink.table.api.bridge.java.StreamTableEnvironment
>>
>>
>>
>>
>> 代码:
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> streamExecutionEnv.enableCheckpointing(20 * 1000,
>> CheckpointingMode.EXACTLY_ONCE)
>> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(900 * 1000)
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>> pom.xml:
>> 
>>   UTF-8
>> 
>> 1.11-SNAPSHOT
>>   1.8
>>   2.11.12
>>   2.11
>>   ${java.version}
>>   ${java.version}
>>
>> 
>>  org.apache.flink
>>  flink-java
>>  ${flink.version}
>> 
>> 
>>   
>>  org.apache.flink
>>
>>  flink-streaming-java_${scala.binary.version}
>>  ${flink.version}
>> 
>> 
>>
>> 
>> 
>>  org.apache.flink
>>  flink-table
>>  ${flink.version}
>>  pom
>> 
>> 
>>
>>   
>>  org.apache.flink
>>  flink-scala_2.11
>>  ${flink.version}
>> 
>> 
>>   
>>  org.apache.flink
>>  flink-jdbc_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-streaming-scala_2.11
>>  ${flink.version}
>> 
>> 
>>
>>   
>>  org.apache.flink
>>  flink-table-common
>>  ${flink.version}
>> 
>> 
>> 
>> 
>>  org.apache.flink
>>  flink-table-api-scala-bridge_2.11
>>  ${flink.version}
>> 
>> 
>>
>> 
>> 
>>  org.apache.flink
>>  flink-table-api-scala_2.11
>>  ${flink.version}
>> 
>> 
>>
>>
>>
>>
>> 
>>
>>   
>>   
>>
>>
>> 
>>  org.apache.flink
>>  flink-connector-kafka_2.11
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  flink-avro
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  flink-csv
>>  ${flink.version}
>>  provided
>>   
>> 
>> 
>>  org.apache.flink
>>  flink-json
>>  ${flink.version}
>>  provided
>>   
>>
>>
>> 
>>
>>
>> 
>>  org.apache.bahir
>>  flink-connector-redis_2.11
>>  1.0
>>  provided
>>   
>>
>> 
>> 
>>  org.apache.flink
>>  flink-connector-hive_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>> 
>> 
>> 
>> 
>> 
>> 
>>
>> 
>>  org.apache.flink
>>  flink-table-api-java
>>  ${flink.version}
>>  provided
>>   
>>
>> 
>> 
>>  org.apache.flink
>>  flink-table-planner_2.11
>>  ${flink.version}
>> 
>> 
>>
>>   
>>  org.apache.flink
>>  flink-table-planner-blink_2.11
>>  ${flink.version}
>>  provided
>>   
>> 
>> 
>>  org.apache.flink
>>  flink-sql-connector-kafka_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>
>>   
>>  org.apache.flink
>>  flink-connector-hbase_2.11
>>  ${flink.version}
>>   


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 文章 Sun.Zhu
非常感谢,我去试试


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 18:13,Rui Li 写道:
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17626017...@163.com> wrote:

对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li 写道:

是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li



--
Best regards!
Rui Li


sql-client????????????????SUCCEEDED????

2020-06-18 文章 MuChen
hi,

yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink 
-nm fsql-cli  /dev/null 21 

sql-clientsql??

kafkahive??joinmysql 
sql??
#  -- 
??5??vid??vid_group 
-- ??55mysql insert into 
rt_app.app_video_cover_abtest_test  select  begin_time,  vid,  vid_group,  
max(dv),  max(click),  max(vv),  max(effectivevv) from(  select   t1.begin_time 
begin_time,   t1.u_vid vid,   t1.u_vid_group vid_group,   dv,   click,   vv,   
if(effectivevv is null,0,effectivevv) effectivevv  from  (   -- dv??click??vv   
select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,   
 cast(u_vid as bigint) u_vid,u_vid_group,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and 
u_c_module='M011',1,0)) dv,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and 
u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and 
u_f_module='M011',1,0)) vv   FROM rt_ods.ods_applog_vidsplit   where u_vid is 
not null and trim(u_vid)<''and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')and (  (concat(u_mod,'-',u_ac) in 
('emptylog-video_display','emptylog-video_click')  and u_c_module='M011')  or  
(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') )   group by 
TUMBLE(bjdt, INTERVAL '5' MINUTE),cast(u_vid as bigint),u_vid_group  ) 
t1  left join  (   -- effectivevv   selectbegin_time,u_vid,
u_vid_group,count(1) effectivevv   from   (select  begin_time,  u_vid,  
u_vid_group,  u_diu,  u_playid,  m_pt,  q70fromdw.video_pic_title_q70 a 
   join( select   CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS 
STRING) begin_time,  cast(u_vid as bigint) u_vid,  u_vid_group,  u_diu,  
u_playid,  max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where 
u_vid is not null and trim(u_vid)<''  and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')  and 
concat(u_mod,'-',u_ac)='emptylog-video_play_speed'  and u_f_module='M011'  and 
u_playtime0 group by   TUMBLE(bjdt, INTERVAL '5' MINUTE),  cast(u_vid 
as bigint),  u_vid_group,  u_diu,  u_playid) bon a.vid=b.u_vidgroup 
by   begin_time,  u_vid,  u_vid_group,  u_diu,  u_playid,  m_pt,  q70   ) temp  
 where m_pt=q70   group by begin_time,u_vid,u_vid_group  ) t2  
on t1.begin_time=t2.begin_time   and t1.u_vid=t2.u_vid   and 
t1.u_vid_group=t2.u_vid_group )t3   group by begin_time,  vid,  vid_group ; 
succeeded??https://s1.ax1x.com/2020/06/18/NnyX24.png

INFO??:
2020-06-17 21:27:07,968 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queue java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:201
 4) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 
ps: 1. kafka
2. flink1.10.0
??SUCCEEDED

??

flink启动主类反射异常

2020-06-18 文章 a773807...@gmail.com
大家好:
  我在flink的启动主类上,配置了代码,根据入参的参数,动态反射加载对应的类来启动不同的flink job, 
在本地拉起是可以实现这个功能,但是部署到集群上的时候,就显示反射异常,请问是什么问题?

具体日志:
2020-06-18 20:14:06,354 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler   - Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: null
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
com.huanju.riskplatform.flinkfactor.StartPoint.main(StartPoint.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at 
com.huanju.riskplatform.flinkfactor.model.ModelThree.analysisRun(ModelThree.java:349)
... 19 more



a773807...@gmail.com


Re: 项目引用flink-1.11.0,打包失败

2020-06-18 文章 Jark Wu
能贴下完整代码吗? (imports 部分)

Best,
Jark

On Thu, 18 Jun 2020 at 19:18, Zhou Zach  wrote:

>
>
> flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
> idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij
> Idea Build可以成功,就是打包的时候出错。。
>
>
>
>
> [ERROR]
> /Users/Zach/flink-common_1.11.0/src/main/scala/org/rabbit/sql/FromKafkaSinkJdbcForUserUV.scala:7:
> error: object StreamTableEnvironment is not a member of package
> org.apache.flink.table.api.bridge.java
> [ERROR] import
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment
>
>
>
>
> 代码:
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(20 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(900 * 1000)
>
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
> pom.xml:
> 
>   UTF-8
> 
> 1.11-SNAPSHOT
>   1.8
>   2.11.12
>   2.11
>   ${java.version}
>   ${java.version}
>
> 
>  org.apache.flink
>  flink-java
>  ${flink.version}
> 
> 
>   
>  org.apache.flink
>
>  flink-streaming-java_${scala.binary.version}
>  ${flink.version}
> 
> 
>
> 
> 
>  org.apache.flink
>  flink-table
>  ${flink.version}
>  pom
> 
> 
>
>   
>  org.apache.flink
>  flink-scala_2.11
>  ${flink.version}
> 
> 
>   
>  org.apache.flink
>  flink-jdbc_2.11
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-streaming-scala_2.11
>  ${flink.version}
> 
> 
>
>   
>  org.apache.flink
>  flink-table-common
>  ${flink.version}
> 
> 
> 
> 
>  org.apache.flink
>  flink-table-api-scala-bridge_2.11
>  ${flink.version}
> 
> 
>
> 
> 
>  org.apache.flink
>  flink-table-api-scala_2.11
>  ${flink.version}
> 
> 
>
>
>
>
> 
>
>   
>   
>
>
> 
>  org.apache.flink
>  flink-connector-kafka_2.11
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-avro
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-csv
>  ${flink.version}
>  provided
>   
> 
> 
>  org.apache.flink
>  flink-json
>  ${flink.version}
>  provided
>   
>
>
> 
>
>
> 
>  org.apache.bahir
>  flink-connector-redis_2.11
>  1.0
>  provided
>   
>
> 
> 
>  org.apache.flink
>  flink-connector-hive_2.11
>  ${flink.version}
>  provided
>   
>
> 
> 
> 
> 
> 
> 
>
> 
>  org.apache.flink
>  flink-table-api-java
>  ${flink.version}
>  provided
>   
>
> 
> 
>  org.apache.flink
>  flink-table-planner_2.11
>  ${flink.version}
> 
> 
>
>   
>  org.apache.flink
>  flink-table-planner-blink_2.11
>  ${flink.version}
>  provided
>   
> 
> 
>  org.apache.flink
>  flink-sql-connector-kafka_2.11
>  ${flink.version}
>  provided
>   
>
>
>   
>  org.apache.flink
>  flink-connector-hbase_2.11
>  ${flink.version}
>   


Flink 多Sink 数据一致性保证

2020-06-18 文章 xueaohui_...@163.com

如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。



xueaohui_...@163.com


项目引用flink-1.11.0,打包失败

2020-06-18 文章 Zhou Zach


flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
 
idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij 
Idea Build可以成功,就是打包的时候出错。。




[ERROR] 
/Users/Zach/flink-common_1.11.0/src/main/scala/org/rabbit/sql/FromKafkaSinkJdbcForUserUV.scala:7:
 error: object StreamTableEnvironment is not a member of package 
org.apache.flink.table.api.bridge.java
[ERROR] import org.apache.flink.table.api.bridge.java.StreamTableEnvironment




代码:
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(20 * 1000, 
CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(900 * 1000)

val blinkEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
blinkEnvSettings)
pom.xml:

  UTF-8

1.11-SNAPSHOT
  1.8
  2.11.12
  2.11
  ${java.version}
  ${java.version}
   

 org.apache.flink
 flink-java
 ${flink.version}


  
 org.apache.flink
 flink-streaming-java_${scala.binary.version}
 ${flink.version}





 org.apache.flink
 flink-table
 ${flink.version}
 pom



  
 org.apache.flink
 flink-scala_2.11
 ${flink.version}


  
 org.apache.flink
 flink-jdbc_2.11
 ${flink.version}
 provided
  

  
 org.apache.flink
 flink-streaming-scala_2.11
 ${flink.version}



  
 org.apache.flink
 flink-table-common
 ${flink.version}




 org.apache.flink
 flink-table-api-scala-bridge_2.11
 ${flink.version}





 org.apache.flink
 flink-table-api-scala_2.11
 ${flink.version}








  
  



 org.apache.flink
 flink-connector-kafka_2.11
 ${flink.version}
 provided
  
  
 org.apache.flink
 flink-avro
 ${flink.version}
 provided
  
  
 org.apache.flink
 flink-csv
 ${flink.version}
 provided
  


 org.apache.flink
 flink-json
 ${flink.version}
 provided
  






 org.apache.bahir
 flink-connector-redis_2.11
 1.0
 provided
  



 org.apache.flink
 flink-connector-hive_2.11
 ${flink.version}
 provided
  









 org.apache.flink
 flink-table-api-java
 ${flink.version}
 provided
  



 org.apache.flink
 flink-table-planner_2.11
 ${flink.version}



  
 org.apache.flink
 flink-table-planner-blink_2.11
 ${flink.version}
 provided
  


 org.apache.flink
 flink-sql-connector-kafka_2.11
 ${flink.version}
 provided
  


  
 org.apache.flink
 flink-connector-hbase_2.11
 ${flink.version}
  

Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 文章 Rui Li
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17626017...@163.com> wrote:

> 对应这种改动还是挺大的,有对应的说明文档吗?
> hive.metastore.uris 这个需要怎么配置,有样例吗?
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月18日 17:01,Rui Li 写道:
>
> 是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。
>
> On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:
>
>
> Hi
>
> 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:
>
> Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
> allowed. Make sure you have set a valid value for hive.metastore.uris
>
> 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
> metastore 并在conf文件配置 hive.metastore.uris
>
> Best,
> Leonard Xu
>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best regards!
Rui Li


Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 Leonard Xu
Hello

1.7.2是比较老的版本了, 可以考虑下升级新的版本,新的版本都支持你所需的功能的。

1.10.0 && 1.10.1 文档[1],对应的两个参数:

  'connector.write.flush.max-rows' = '5000', -- optional, flush max size 
(includes all append, upsert and delete records), 
 -- 
over this number of records, will flush data. The default value is "5000".
  'connector.write.flush.interval' = '2s', --optional, flush interval 
mills, over this time, asynchronous threads will flush data.
如果使用1.10版本,推荐使用1.10.1版本,1.10.1在1.10.0的基础上修复了一些bug。

社区即将发布的1.11.0 文档[2], 对应的两个参数:
sink.buffer-flush.max-rows  -- The max size of buffered records before 
flush. Can be set to zero to disable it.
sink.buffer-flush.interval  -- The flush interval mills, over this time, 
asynchronous threads will flush data. 
-- Can be set to '0' to disable 
it. Note, 'sink.buffer-flush.max-rows' can be set to '0' 
-- with the flush interval set 
allowing for complete async processing of buffered actions.

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
 

 
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options
 

 


> 在 2020年6月18日,17:04,nicygan  写道:
> 
> dear all:
> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
> 
> 
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>.setDrivername("com.mysql.jdbc.Driver")
>.setDBUrl("jdbc:mysql://localhost:3306/flink")
>.setUsername("root")
>.setPassword("123456")
>.setQuery(sql2)
>.setParameterTypes(types)
> .setBatchSize(1000)
>   .build();
> 
> === 问题 
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
> 
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
> 
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
> 
> thanks



Re: flink1.10.1 SQL 作业 netty报错, 求帮助

2020-06-18 文章 Jark Wu
看起来是一个已知问题: https://issues.apache.org/jira/browse/FLINK-17479

On Wed, 17 Jun 2020 at 11:00, hb <343122...@163.com> wrote:

> flink1.10.1 写的 SQL 作业, 开始运行3个小时正常,  checkpoint也正常.
> 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动.
>
> TaskManager 日志:
> 2020-06-16 20:38:16,640 INFO org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discovered
> group coordinator 172.16.30.165:9092 (id: 2147483645 rack: null)
> 2020-06-16 23:27:46,026 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5
> c29783b8f7ed8bfb1a7723f5c4216b1).
> 2020-06-16 23:27:46,027 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, 

Re: flink sql 窗口场景的问题

2020-06-18 文章 Leonard Xu
Hi, 

窗口输出可以加emit策略,在watermark未触发时提前输出window的中间结果,不过社区目前标注的是experimental的功能,生产环境中应谨慎使用。
table.exec.emit.early-fire.enabled
table.exec.emit.early-fire.delay
可以参考[1]。

Best
Leonard Xu

[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L174
 

 



Re: FileInputFormat 使用问题

2020-06-18 文章 john
嗨,找到问题了吗?我也遇到了

> 2020年6月1日 下午2:48,阿华田  写道:
> 
> //初始化任务参数
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> FileInputFormat fileInputFormat = new TextInputFormat(new 
> Path("hdfs://arc/success_fid_flow "));
> fileInputFormat.setNestedFileEnumeration(true);
> //过滤掉条件为true
> fileInputFormat.setFilesFilter(new 
> RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
> DataSet source =env.createInput(fileInputFormat);
> source.output(new HdfsTrainSinktest());



Re:Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 nicygan



请问timeout值是多少?在哪里可设置?



在 2020-06-18 17:43:31,"Benchao Li"  写道:
>我理解现在就是你想要的效果。
>batch-size和timeout两个条件是达到一个就会flush的。
>
>nicygan  于2020年6月18日周四 下午5:05写道:
>
>> dear all:
>>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>>
>>
>> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>> .setDrivername("com.mysql.jdbc.Driver")
>> .setDBUrl("jdbc:mysql://localhost:3306/flink")
>> .setUsername("root")
>> .setPassword("123456")
>> .setQuery(sql2)
>> .setParameterTypes(types)
>>  .setBatchSize(1000)
>>.build();
>>
>> === 问题 
>> 如果上游数据来源时间是:
>> 10:00 -> 900条
>> 10:10 -> 120条
>> 11:50 -> 1100条
>> 15:00 -> 900条
>>
>> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
>> 10:10 -> 写入1000条,剩20条下次写入
>> 11:50 -> 写入1000条,剩30条下次写入
>> 15:00 -> 写入1000条,剩10条下次写入
>>
>> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
>> 10:10 -> 写入1000条,剩20条下次写入
>> 10:30 -> 写入20条
>> 11:50 -> 写入1000条,剩10条下次写入
>> 12:10 -> 写入10条
>> 15:20 -> 写入900条
>>
>> thanks
>>
>
>
>-- 
>
>Best,
>Benchao Li


Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 Benchao Li
我理解现在就是你想要的效果。
batch-size和timeout两个条件是达到一个就会flush的。

nicygan  于2020年6月18日周四 下午5:05写道:

> dear all:
>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("jdbc:mysql://localhost:3306/flink")
> .setUsername("root")
> .setPassword("123456")
> .setQuery(sql2)
> .setParameterTypes(types)
>  .setBatchSize(1000)
>.build();
>
> === 问题 
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks
>


-- 

Best,
Benchao Li


Re: flink sql 窗口场景的问题

2020-06-18 文章 john
嗨,推荐你使用这个:窗口实用触发器 ContinuousEventTimeTrigger

> 2020年6月3日 下午10:29,Sun.Zhu <17626017...@163.com> 写道:
> 
> hi
> 你是要每条数据都计算当前5分钟内的聚合值吗?如果是这样的话可以考虑使用over window
> 
> 
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
> 
> 
> 在2020年06月3日 02:56,steven chen 写道:
> hi :
> 我现在遇到有这样一个场景,我们需要实时去统计5分和30分的粒度,flink sql 窗口使用了处理时间滑动窗口方式
> 但是都是只有5分结束的时候才能把聚合结果输出,这个不满足我们需求,有没有方式可以直接实时输出结果,比如18:02 的统计+1+1 
> 都能直接落在18:00-18:05的窗口上,并每次+1都能实时输出,而不是等到窗口结束才sink 到mysql .30分钟我同样



回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 文章 Sun.Zhu
对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li 写道:
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li


flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 文章 nicygan
dear all:
 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。


JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("123456")
.setQuery(sql2)
.setParameterTypes(types)
 .setBatchSize(1000)
   .build();

=== 问题 
如果上游数据来源时间是:
10:00 -> 900条
10:10 -> 120条
11:50 -> 1100条
15:00 -> 900条

JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
10:10 -> 写入1000条,剩20条下次写入
11:50 -> 写入1000条,剩30条下次写入
15:00 -> 写入1000条,剩10条下次写入

我想要达到等待20分种,不满足batchSize也写入,能否实现?
10:10 -> 写入1000条,剩20条下次写入
10:30 -> 写入20条
11:50 -> 写入1000条,剩10条下次写入
12:10 -> 写入10条
15:20 -> 写入900条

thanks


sql-client????????????????SUCCEEDED????

2020-06-18 文章 MuChen
hi,


yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink 
-nm fsql-cli  /dev/null 21 
sql-clientsql??
kafkahive??joinmysql


??succeeded??







ps:kafka
??SUCCEEDED


??

Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 文章 Rui Li
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:

>
> Hi
>
> > 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:
> >
> > Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
> allowed. Make sure you have set a valid value for hive.metastore.uris
>
> 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
> metastore 并在conf文件配置 hive.metastore.uris
>
> Best,
> Leonard Xu



-- 
Best regards!
Rui Li


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 文章 Leonard Xu
 
Hi

> 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:
> 
> Caused by: java.lang.IllegalArgumentException: Embedded metastore is not 
> allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive metastore 
并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu

回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 文章 Sun.Zhu
Hi,Rui Li
我把connector的包也替换成1.11的了,结果sql-cli启动报错
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:818)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:230)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalArgumentException: Embedded metastore is not 
allowed. Make sure you have set a valid value for hive.metastore.uris
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:171)
at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:157)
at 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:366)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$6(ExecutionContext.java:565)
at java.util.HashMap.forEach(HashMap.java:1289)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:564)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:252)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:563)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:512)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:171)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:124)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:807)


hive catalog的配置和1.10.1一样,如下:
catalogs: #[] # empty list
# A typical catalog definition looks like:
  - name: myhive
type: hive
hive-conf-dir: /Users/zhushang/Desktop/software/apache-hive-2.2.0-bin/conf
hive-version: 2.2.0
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 15:46,Rui Li 写道:
第三方包指的是flink-connector-hive这种吗?这些包在build的时候也会打出来的,只不过没有加到flink-dist里。到对应的module里找一下,比如flink-connector-hive会在/flink-connectors/flink-connector-hive/target下面。

On Thu, Jun 18, 2020 at 12:22 PM Jark Wu  wrote:

你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote:



是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月17日 13:25,Rui Li 写道:
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:

Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError:
org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 <
https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka 

回复:env.readFile 递归监控目录 如何清理状态(历史目录)

2020-06-18 文章 star
感谢您的建议!如果我把hdfs目录删掉,flink里对应的状态也会清掉吗?



发自我的iPhone


-- 原始邮件 --
发件人: Jark Wu https://issues.apache.org/jira/browse/FLINK-18357;
我的一个初步的想法是,是否可以有一个 inactive-interval 去标记一个子目录已经不会有新文件产生了,这样 checkpoint
就不用跟踪这个子目录下的所有文件。


Best,
Jark

On Wed, 17 Jun 2020 at 14:04, star <3149768...@qq.com wrote:

 nbsp;


 env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)


 上面是一个监控目录里的数据的source
 format设置成递归监控一个父目录A , A下面是日期目录 ,如:


 A/20200101/
 A/20200102/
 A/20200103/
 ...
 ...



 
随着时间的增加,比如到6月需要监控近200个目录,每个目录又有500个文件,每次ck需要同步的状态就是200*500个文件的消费的offset,这样经常ck超时,


 请问这种可以清理历史状态吗,比如业务上知乎有近7天的目录才会有数据更新,历史的就可以不监控了。

Re: env.readFile 递归监控目录 如何清理状态(历史目录)

2020-06-18 文章 Jark Wu
Hi,

我觉得这个也许可以先从业务上解决。比如你可以有另一个作业定期去 HDFS 上把过期的数据清理掉(比如半个月前的?)。

另外,我也开了一个 issue 去跟进这个问题,看看社区里面对这块比较熟的同学有没有更好的建议。FLINK-18357

我的一个初步的想法是,是否可以有一个 inactive-interval 去标记一个子目录已经不会有新文件产生了,这样 checkpoint
就不用跟踪这个子目录下的所有文件。


Best,
Jark

On Wed, 17 Jun 2020 at 14:04, star <3149768...@qq.com> wrote:

> 
>
>
> env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)
>
>
> 上面是一个监控目录里的数据的source
> format设置成递归监控一个父目录A , A下面是日期目录 ,如:
>
>
> A/20200101/
> A/20200102/
> A/20200103/
> ...
> ...
>
>
>
> 随着时间的增加,比如到6月需要监控近200个目录,每个目录又有500个文件,每次ck需要同步的状态就是200*500个文件的消费的offset,这样经常ck超时,
>
>
> 请问这种可以清理历史状态吗,比如业务上知乎有近7天的目录才会有数据更新,历史的就可以不监控了。


Re: Flink kerberos环境下多个keytab认证问题

2020-06-18 文章 john
不必写在配置文件里,在提交的时候使用 -yD 动态指定参数。-yD use value for given 
property。这个参数可以多个。
另外在提交的时候,可以使用klist命令看下默认的principal是哪个。
使用:kinit -kt .keytab  可以更改default principal 也就是切换用户。你可以试下。

> 2020年6月12日 上午11:36,zhangjunjie1...@163.com 写道:
> 
>Flink1.9环境下,搭建Flink on yarn平台,用户之间实现租户/资源隔离,在kerberos环境下运行Flink 
> perjob模式,需要在Flink-conf.yaml中添加:
>security.kerberos.login.use-ticket-cache: true
>security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
>security.kerberos.login.principal: p_zhangjun...@local.com 
>但是如果多个用户使用Flink环境资源,比如说除了p_zhangjunjie,还是p_wanglin,然后我在Flink-conf.yaml中添加:
>security.kerberos.login.use-ticket-cache: true
>security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
>security.kerberos.login.principal: p_zhangjun...@local.com 
> 
>security.kerberos.login.use-ticket-cache: true
>security.kerberos.login.keytab: /home/flink/p_wanglin.keytab
>security.kerberos.login.principal: p_wang...@local.com 
>但是只有最下面的一个(比如p_wanglin)生效。使用p_zhangjunjie就报错:那如何实现多个keytab用户同时生效呢?
> 
> 谢谢!麻烦大家帮忙解决一下了。哪怕提供个思路都可以。
> 
> 
> 
> 
> 
> zhangjunjie1...@163.com