Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-27 Thread Kai Fu
Hi team,

We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
as in code
.
We're aware that this is useful to retract output records in some cases,
but we cannot come up with such a scenario, could anyone name a few cases
for it.

The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES
connector to reduce the sink traffic since almost all of our records are
update. In our case, the records are generated by joining with a couple of
upsert-kafka data sources. Only primary-key participants in the join
condition for all join cases, with some granularity/cardinality fan-out in
the middle. We want to know whether it impacts the final result correctness
if we drop the records with UPDATE_BEFORE row kind.

-- 
*Best wishes,*
*- Kai*


Re: [Question] Why doesn't Flink use Calcite adapter?

2021-06-27 Thread JING ZHANG
Hi guangyuan,
The question is an interesting and broad topic. I try to give my opinion
based on my limited knowledge.

Flink introduces dynamic sources to read from an external system[1]. Flink
connector modules are completely decoupled with Calcite. There are two
benefits:
(1) If users need to develop a custom, user-defined connector, no
background knowledge of Calcite is required.
(2) Remove unnecessary external dependencies in the Flink connector module.

Besides, since Flink is distributed for stateful computations over *unbounded
and bounded* data streams, there are more things to be taken into
consideration when connected with an external system.
For example, how to complete data reading with multiple concurrency, how to
store metastore to state in order to recover after failover.
I list a few issues as follows. These issues are strongly related to the
Flink engine which are not defined in Calcite built-in adapters.
(1) Required: define how to read from an external storage system
 1.1 scan all rows or lookup rows by one or more keys
 1.2 if choose scan mode, define how to split source, how to store
metadata to state in order to recover them after recovery from failover.
(2) Required: mapping from data type in external system to Flink data type
system
(3) Optional: for planner optimization, define optionally
ability interfaces, e.g SupportsProjectionPushDown/SupportFilterPushDown
and so on.
(4) Optional: define encoding/ decoding formats

Hope it helps. Please correct me if I'm wrong.

Best regards,
JING ZHANG

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sourcessinks/

Israel Ekpo  于2021年6月28日周一 上午8:28写道:

> Maybe this question was better addressed to the DEV list.
>
> On Fri, Jun 25, 2021 at 11:57 PM guangyuan wang 
> wrote:
>
>>
>> 
>>
>> I have read the design doc of the Flink planner recently. I've found the
>> Flink only uses Calcite as an SQL optimizer. It translates an optimized
>> RelNode to Flink(or Blink)RelNode, and then transfers it to the physical
>> plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier
>> way to use calcite?
>>
>>
>> The link of calcite daptor:calcite.apache.org/docs/adapter.html.
>>
>


退订

2021-06-27 Thread 高耀军
退订

[Flink SQL] Lookup join hbase problem

2021-06-27 Thread 纳兰清风
Hi,


  When I was using hbase table as my lookup table, I got this error:


Caused by: java.lang.IllegalArgumentException: Currently, HBase table can 
only be lookup by single row key.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)


My SQL is


insert into sink_kafka(user_ucid,city_code,`source`,system_type)
SELECT t1.ucid AS user_ucid,
 t1.city_code,
 t1.`source`,
 t1.system_type
FROM tmp_ucid_check t1
LEFT JOIN dim_hbase_valid_im_commercial_cust_di_cache for SYSTEM_TIME AS OF 
t1.proctime AS t2
ON concat(t1.ucid,'&',t1.city_code) = t2.rowKey
WHERE t2.city_code is NOT null
AND t1.city_code = t2.city_code; 


It maybe because the conditions in where clause, being pushed down as  a 
predicate into join clause ?
How can I solve this problem ? 


Thank you

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread zhisheng
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧?

Wei JI10 季伟  于2021年6月28日周一 下午12:19写道:

> 您好,
> 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?
>
> 在 2021/6/28 上午11:59,“Jingsong Li” 写入:
>
> 注意:此封邮件来自于公司外部,请注意信息安全!
> Attention: This email comes from outside of the company, please pay
> attention to the information security!
>
> Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?
>
> BEST,
> Jingsong
>
> On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟  >
> wrote:
>
> > 您好,
> > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
> >
> >
>
> --
> Best, Jingsong Lee
>
>


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Wei JI10 季伟
您好,
版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?

在 2021/6/28 上午11:59,“Jingsong Li” 写入:

注意:此封邮件来自于公司外部,请注意信息安全!
Attention: This email comes from outside of the company, please pay 
attention to the information security!

Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?

BEST,
Jingsong

On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 
wrote:

> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>

--
Best, Jingsong Lee



Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Jingsong Li
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?

BEST,
Jingsong

On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 
wrote:

> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>

-- 
Best, Jingsong Lee


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Wei JI10 季伟
您好,
不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。



Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread zhisheng
使用的是 sql client 测试的 sql 吗?如果是的话,记得在 flink lib 目录下添加 flink-sql-parquet jar
包,然后重启集群和 sql client

Wei JI10 季伟  于2021年6月28日周一 上午9:35写道:

> 您好,
> 添加的parquet 依赖如下,不知道全不全
> 
> org.apache.flink
> flink-parquet_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.parquet
> parquet-avro
> 1.10.1
> 
> 
>
>
>


Re: 回复:flink 1.12如何实现window topN功能

2021-06-27 Thread zhisheng
可以将 1.13 的这个功能打在 flink 1.12 上面,然后引用你们新打的依赖

casel.chen  于2021年6月23日周三 下午12:08写道:

> -- source
> CREATE TABLE tmall_item(
> itemID VARCHAR,
> itemType VARCHAR,
> onSellTime TIMESTAMP(3),
> price DOUBLE,
> proctime AS PROCTIME(),
> WATERMARK FOR onSellTime AS onSellTime - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'filesystem' ,
> 'path' = 'file:///path/to/over-window.csv',
> 'format' = 'csv'
> );
>
> -- sink
> CREATE TABLE print_table (
> itemID VARCHAR,
> itemType VARCHAR,
> onSellTime TIMESTAMP(3),
> price DOUBLE
> ) WITH (
> 'connector' = 'print'
> );
>
> -- insert
> INSERT INTO print_table
> SELECT itemID,
> itemType,
> onSellTime,
> price
> FROM (
> SELECT itemID,
> itemType,
> onSellTime,
> price,
> ROW_NUMBER() OVER (
> PARTITION BY itemID, DATE_FORMAT(proctime, 'MMddHHmm')
> ORDER BY onSellTime DESC
> ) AS row_num
> FROM tmall_item
>  ) WHERE row_num = 1;
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-23 11:06:39,"杨光跃"  写道:
> >应该是这样吧
> >
> >
> >1. 第一步以主键group by 以及分时间窗口
> >SELECT 主键, TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, FROM
> source_event  group by TUMBLE(ts, INTERVAL '10' SECOND), 主键
> >
> >
> >2. 根据上一步的结果取top5
> >select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY
> 处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5
> >
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 10:58,casel.chen 写道:
> >你指的是TopN吗?
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
> >但我想知道window topN写法,跟这个还不一样。
> >我的需求是:
> >cdc场景同一个主键数据变更频繁,我想定义一个5秒处理时间窗口,在该窗口内取同一主键最新变更记录。用flink sql 1.12如何实现?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-06-23 10:18:01,"杨光跃"  写道:
> >
> >
> >Apache Flink 1.12 Documentation: Queries
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 10:09,casel.chen 写道:
> >请不要截图哦
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-06-23 09:47:46,"杨光跃"  写道:
> >
> >1.12也支持的
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 09:45,casel.chen 写道:
> >官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
>


回复:flinksql写入hive问题

2021-06-27 Thread 杨光跃
写入hive在读取,我试了下是可以的。。。
第一步:
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/admin/hive/conf'
);
第二部
USE CATALOG myhive;
第三步
select * from hive_table;


猜测可能的问题,我们本地部署设置的slot都是1,你可能是在跑着写入任务,没有资源跑读取任务?


你可以设置把写入任务停了,或者设置方言问 : SET table.sql-dialect=hive;
然后在查询试试。






| |
杨光跃
|
|
yangguangyuem...@163.com
|


签名由网易邮箱大师定制
在2021年6月24日 18:00,Geoff nie 写道:
非常感谢!我是在sql-client上提交的,修改配置文件已经成功提交了。hive表下分区文件名如下:
part-f3fa374b-c563-49c8-bd7a-b3bd7a5fb66d-0-2


还有两个问题请教下:
1.我通过如下创建了kafka流表,通过flink-sql查
kafka_table 是有数据的,
但是hdfs上却无文件,为什么呢
。
2.hive_table如上已经成功写入数据了,但是为啥flink-sql及hive却读取不到hive表数据呢,SELECT * FROM 
hive_table WHERE dt='2021-06-21' and hr='18';
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='t_kafka_03',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='192.168.1.*:19092,192.168.1.*:19092,192.168.1.*:19092',
'properties.group.id' = 'testGroup10',
'format'='json'
);




烦请帮忙看下。感谢感谢。

















在 2021-06-24 16:12:35,"杨光跃"  写道:


检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件:  
sql-client-defaults.yaml 中加入如下配置:
configuration:
execution.checkpointing.interval: 1000
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 16:09,Geoff nie 写道:
非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢!

















在 2021-06-24 14:47:24,"杨光跃"  写道:
分区的提交需要开启checkpoint,要配置下


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 14:44,Geoff nie 写道:
您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
















在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:



















 转发邮件信息 
发件人:"潘永克" <13311533...@163.com>
发送日期:2021-02-11 11:12:39
收件人:d...@flink.apache.org
主题:flinksql写入hive问题

咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
".partinprogress"类似的文件。flink1.12.0
基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
创建hive表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
插入数据
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;


文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
















Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Wei JI10 季伟
您好,
添加的parquet 依赖如下,不知道全不全

org.apache.flink
flink-parquet_${scala.binary.version}
${flink.version}


org.apache.parquet
parquet-avro
1.10.1






Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Zhiwen Sun
parquet 相关依赖增加了吗?

Zhiwen Sun



On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 
wrote:

> Hi:
>在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any format factory for identifier 'parquet' in the classpath.
>at
> org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97)
>at
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72)
>at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119)
>... 41 more
>
> Sql语句如下:
> CREATE TABLE user_info (
> `user_id` bigint,
> `user_name` string
> ) PARTITIONED BY (user_id) WITH (
> 'connector' = 'filesystem',
> 'path' = '',
> 'format' = 'parquet'
> );
>
> CREATE TABLE sink_table (
> `user_id` bigint,
> `user_name` string
> ) PARTITIONED BY (datetime) WITH (
> 'connector'='filesystem',
> 'path'='',
> 'format'='parquet',
> 'sink.partition-commit.delay'='1h',
> 'sink.partition-commit.policy.kind'='success-file'
> );
>
> insert OVERWRITE sink_table select *, '2021062600' as  datetime from
> user_info;
>


Re: [Question] Why doesn't Flink use Calcite adapter?

2021-06-27 Thread Israel Ekpo
Maybe this question was better addressed to the DEV list.

On Fri, Jun 25, 2021 at 11:57 PM guangyuan wang 
wrote:

>
> 
>
> I have read the design doc of the Flink planner recently. I've found the
> Flink only uses Calcite as an SQL optimizer. It translates an optimized
> RelNode to Flink(or Blink)RelNode, and then transfers it to the physical
> plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier
> way to use calcite?
>
>
> The link of calcite daptor:calcite.apache.org/docs/adapter.html.
>


Re: Yarn Application Crashed?

2021-06-27 Thread Thomas Wang
Just found some additional info. It looks like one of the EC2 instances got
terminated at the time the crash happened and this job had 7 Task Managers
running on that EC2 instance. Now I suspect it's possible that when Yarn
tried to migrate the Task Managers, there were no idle containers as this
job was using like 99% of the entire cluster. However in that case
shouldn't Yarn wait for containers to become available? I'm not quite sure
how Flink would behave in this case. Could someone provide some insights
here? Thanks.

Thomas

On Sun, Jun 27, 2021 at 4:24 PM Thomas Wang  wrote:

> Hi,
>
> I recently experienced a job crash due to the underlying Yarn application
> failing for some reason. Here is the only error message I saw. It seems I
> can no longer see any of the Flink job logs.
>
> Application application_1623861596410_0010 failed 1 times (global limit
> =2; local limit is =1) due to ApplicationMaster for attempt
> appattempt_1623861596410_0010_01 timed out. Failing the application.
>
> I was running the Flink job using the Yarn session mode with the following
> command.
>
> export HADOOP_CLASSPATH=`hadoop classpath` &&
> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached
>
> I didn't have HA setup, but I believe the underlying Yarn application
> caused the crash because if, for some reason, the Flink job failed, the
> Yarn application should still survive. Please correct me if this is not the
> right assumption.
>
> My question is how I should find the root cause in this case and what's
> the recommended way to avoid this going forward?
>
> Thanks.
>
> Thomas
>


Yarn Application Crashed?

2021-06-27 Thread Thomas Wang
Hi,

I recently experienced a job crash due to the underlying Yarn application
failing for some reason. Here is the only error message I saw. It seems I
can no longer see any of the Flink job logs.

Application application_1623861596410_0010 failed 1 times (global limit =2;
local limit is =1) due to ApplicationMaster for attempt
appattempt_1623861596410_0010_01 timed out. Failing the application.

I was running the Flink job using the Yarn session mode with the following
command.

export HADOOP_CLASSPATH=`hadoop classpath` &&
/usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached

I didn't have HA setup, but I believe the underlying Yarn application
caused the crash because if, for some reason, the Flink job failed, the
Yarn application should still survive. Please correct me if this is not the
right assumption.

My question is how I should find the root cause in this case and what's the
recommended way to avoid this going forward?

Thanks.

Thomas


flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 Thread Wei JI10 季伟
Hi:
   在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
format factory for identifier 'parquet' in the classpath.
   at 
org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97)
   at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72)
   at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119)
   ... 41 more

Sql语句如下:
CREATE TABLE user_info (
`user_id` bigint,
`user_name` string
) PARTITIONED BY (user_id) WITH (
'connector' = 'filesystem',
'path' = '',
'format' = 'parquet'
);

CREATE TABLE sink_table (
`user_id` bigint,
`user_name` string
) PARTITIONED BY (datetime) WITH (
'connector'='filesystem',
'path'='',
'format'='parquet',
'sink.partition-commit.delay'='1h',
'sink.partition-commit.policy.kind'='success-file'
);

insert OVERWRITE sink_table select *, '2021062600' as  datetime from 
user_info;