Re: flinksql join

2022-11-10 Thread Jason_H
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"DIM_ACCOUNT_ID  string ,\n" +
"GMT_CREATE  string ,\n" +
"ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"GMT_UPDATE  string ,\n" +
"ACCT_CODE  string ,\n" +
"CUST_ID  string ,\n" +
"CUST_NAME  string ,\n" +
"CORP_ID  string ,\n" +
"CORP_CERT_CODE  string ,\n" +
"CORP_CERT_TYPE  string ,\n" +
"CUST_MANAGER_JOB_CODE  string ,\n" +
"TEAM_CODE  string ,\n" +
"ORG_ID  string, \n" +
"SUPER_ORG_ID  string, \n" +
"IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Jason_H |
| Date | 11/11/2022 14:42 |
| To | flink中文邮件组 |
| Subject | Re: flinksql join |
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"DIM_ACCOUNT_ID  string ,\n" +
"GMT_CREATE  string ,\n" +
"ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"GMT_UPDATE  string ,\n" +
"ACCT_CODE  string ,\n" +
"CUST_ID  string ,\n" +
"CUST_NAME  string ,\n" +
"CORP_ID  string ,\n" +
"CORP_CERT_CODE  string ,\n" +
"CORP_CERT_TYPE  string ,\n" +
"CUST_MANAGER_JOB_CODE  string ,\n" +
"TEAM_CODE  string ,\n" +
"ORG_ID  string, \n" +
"SUPER_ORG_ID  string, \n" +
"IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Zhiwen Sun |
| Date | 11/11/2022 14:08 |
| To |  |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left 

Re: flinksql join

2022-11-10 Thread Jason_H
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"DIM_ACCOUNT_ID  string ,\n" +
"GMT_CREATE  string ,\n" +
"ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"GMT_UPDATE  string ,\n" +
"ACCT_CODE  string ,\n" +
"CUST_ID  string ,\n" +
"CUST_NAME  string ,\n" +
"CORP_ID  string ,\n" +
"CORP_CERT_CODE  string ,\n" +
"CORP_CERT_TYPE  string ,\n" +
"CUST_MANAGER_JOB_CODE  string ,\n" +
"TEAM_CODE  string ,\n" +
"ORG_ID  string, \n" +
"SUPER_ORG_ID  string, \n" +
"IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Zhiwen Sun |
| Date | 11/11/2022 14:08 |
| To |  |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flinksql join

2022-11-10 Thread Jason_H
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
   "WITH temp AS (\n" +
   "select \n" +
   "  ta.gmtStatistical as gmtStatistical,\n" +
   "  ta.paymentMethod as paymentMethod,\n" +
   "  tb.CORP_ID as outCorpId,\n" +
   "  tc.CORP_ID as inCorpId,\n" +
   "  sum(ta.tradeAmt) as tranAmount,\n" +
   "  sum(ta.tradeCnt) as tranNum \n" +
   "from dws_a2a_trade_year_index ta \n" +
   "left join dob_dim_account as tb on ta.outAcctCode = 
tb.ACCT_CODE \n" +
   "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE 
\n" +
   "group by \n" +
   " ta.gmtStatistical, \n" +
   " ta.paymentMethod, \n" +
   " tb.CORP_ID, \n" +
   " tc.CORP_ID \n" +
   ") \n" +
   "SELECT \n" +
   "   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as 
gmtUpdate, \n" +
   "   gmtStatistical, \n" +
   "   paymentMethod, \n" +
   "   outCorpId, \n" +
   "   inCorpId, \n" +
   "   tranAmount, \n" +
   "   tranNum \n" +
   "FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
   "DIM_ACCOUNT_ID  string ,\n" +
   "GMT_CREATE  string ,\n" +
   "ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
   "GMT_UPDATE  string ,\n" +
   "ACCT_CODE  string ,\n" +
   "CUST_ID  string ,\n" +
   "CUST_NAME  string ,\n" +
   "CORP_ID  string ,\n" +
   "CORP_CERT_CODE  string ,\n" +
   "CORP_CERT_TYPE  string ,\n" +
   "CUST_MANAGER_JOB_CODE  string ,\n" +
   "TEAM_CODE  string ,\n" +
   "ORG_ID  string, \n" +
   "SUPER_ORG_ID  string, \n" +
   "IS_OUTSIDE  BIGINT \n" +
   ") \n" +
   "WITH (\n" +
   "  'connector' = 'jdbc',\n" +
   "  'url' = '***',\n" +
   "  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
   "  'username' = 'root',\n" +
   "  'password' = '123456',\n" +
   //"  'lookup.cache.ttl' = '1s', \n" +
   "  'table-name' = 'dob_dim_account' \n" +
   //"  'lookup.cache.max-rows' = '1000' \n" +
   //"  'lookup.cache.ttl' = '1 minute',\n" +
   //"  'lookup.max-retries' = '3' \n" +
   " )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来




| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Zhiwen Sun |
| Date | 11/11/2022 14:08 |
| To |  |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flinksql join

2022-11-10 Thread Zhiwen Sun
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:

>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
>  100 1  -> 未匹配
>  100 1  -> 未匹配
>  100 1  -> 匹配上
>
> 维表
> 账号  企业
>   
>      -> 后插入的账号信息
> 实际输出结果
> 企业  金额  笔数
>  100   1
>
>
> 我想要的结果:
> 企业  金额  笔数
>  300   3
>
>
>
>
>
> sql如下:
> String sql2 =  "insert into dws_b2b_trade_year_index\n" +
>"WITH temp AS (\n" +
>"select \n" +
>"  ta.gmtStatistical as gmtStatistical,\n" +
>"  ta.paymentMethod as paymentMethod,\n" +
>"  tb.CORP_ID as outCorpId,\n" +
>"  tc.CORP_ID as inCorpId,\n" +
>"  sum(ta.tradeAmt) as tranAmount,\n" +
>"  sum(ta.tradeCnt) as tranNum \n" +
>"from dws_a2a_trade_year_index ta \n" +
>"left join dob_dim_account for system_time as of ta.proc as
> tb on ta.outAcctCode = tb.ACCT_CODE \n" +
>"left join dob_dim_account for system_time as of ta.proc as
> tc on ta.inAcctCode = tc.ACCT_CODE \n" +
>"group by \n" +
>" ta.gmtStatistical, \n" +
>" ta.paymentMethod, \n" +
>" tb.CORP_ID, \n" +
>" tc.CORP_ID \n" +
>") \n" +
>"SELECT \n" +
>"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
>"   gmtStatistical, \n" +
>"   paymentMethod, \n" +
>"   outCorpId, \n" +
>"   inCorpId, \n" +
>"   tranAmount, \n" +
>"   tranNum \n" +
>"FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


来自古月的邮件

2022-11-10 Thread 古月
退订

Re: Task Manager restart and RocksDB incremental checkpoints issue.

2022-11-10 Thread Yanfei Lei
Hi Vidya Sagar,

Could you please share the reason for TaskManager restart? If the machine
or JVM process of TaskManager crashes, the `RocksDBKeyedStateBackend` can't
be disposed/closed normally,  so the existing rocksdb instance directory
would remain.

BTW, if you use Application Mode on k8s, if a TaskManager(pod) crashes, the
rocksdb directory would be deleted as the pod is released.

Best,
Yanfei

Vidya Sagar Mula  于2022年11月11日周五 01:39写道:

> Hi,
>
> I am using RocksDB state backend for incremental checkpointing with Flink
> 1.11 version.
>
> Question:
> --
> For a given Job ID, Intermediate RocksDB checkpoints are stored under the
> path defined with ""
>
> The files are stored with "_jobID+ radom UUID" prefixed to the location.
>
> Case : 1
> -
> - When I cancel the job, then all the rocksDB checkpoints are deleted
> properly from the location corresponding to that JobId.
> (based on "instanceBasePath" variable stored in RocksDBKeyedStateBackend
> object).
> "NO Issue here. Working as expected".
>
> Case : 2
> -
> - When my TaskManger is restarted, the existing rocksDb checkpoints are
> not deleted.
> New "instanceBasePath" is constructed with the new Random UUID appended to
> the directory.
> And, old checkpoint directories are still there.
>
> questions:
> - Is this expected behaviour not to delete the existing checkPoint
> dirs under the rocksDB local directory?
> - I see the "StreamTaskStateInitializerImpl.java", where new StateBackend
> objects are created. In this case, new directory is created for this Job ID
> appended with new random UUID.
> What happens to the old Directories. Are they going to be purged later on?
> If not, the disk is going to filled up with the older checkpoints. Please
> clarify this.
>
> Thanks,
> Vidya Sagar.
>


flinksql join

2022-11-10 Thread Jason_H


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表 
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
   "WITH temp AS (\n" +
   "select \n" +
   "  ta.gmtStatistical as gmtStatistical,\n" +
   "  ta.paymentMethod as paymentMethod,\n" +
   "  tb.CORP_ID as outCorpId,\n" +
   "  tc.CORP_ID as inCorpId,\n" +
   "  sum(ta.tradeAmt) as tranAmount,\n" +
   "  sum(ta.tradeCnt) as tranNum \n" +
   "from dws_a2a_trade_year_index ta \n" +
   "left join dob_dim_account for system_time as of ta.proc as tb 
on ta.outAcctCode = tb.ACCT_CODE \n" +
   "left join dob_dim_account for system_time as of ta.proc as tc 
on ta.inAcctCode = tc.ACCT_CODE \n" +
   "group by \n" +
   " ta.gmtStatistical, \n" +
   " ta.paymentMethod, \n" +
   " tb.CORP_ID, \n" +
   " tc.CORP_ID \n" +
   ") \n" +
   "SELECT \n" +
   "   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as 
gmtUpdate, \n" +
   "   gmtStatistical, \n" +
   "   paymentMethod, \n" +
   "   outCorpId, \n" +
   "   inCorpId, \n" +
   "   tranAmount, \n" +
   "   tranNum \n" +
   "FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|

Re: Any caveats about processing abstract classes ?

2022-11-10 Thread Gen Luo
Hi Davide,

I suppose it would be fine. The only difference I can figure out that may
matter is the serialization. Flink uses KryoSerializer as the fallback
serializer if the TypeInformation of the records is not provided, which can
properly process abstract classes. This works well in most cases.

For better compatibility you may want to use a customized serializer. In
such cases you can call SingleOutputStreamOperator#returns(TypeInformation)
with your TypeInformation like:
input.map(new MapToAnimal()).returns(new AnimalTypeInformation())


On Thu, Nov 10, 2022 at 9:02 AM Davide Bolcioni via user <
user@flink.apache.org> wrote:

> Greetings,
> I am looking at Flink pipeline processing events consumed from a Kafka
> topic, which now needs to also consume events which have a different, but
> related, schema. Traditional Java OOP would suggest transitioning from
>
> class Dog { ... }
> new FilterFunction { ... }
>
> to
>
> abstract class Animal { ... }
> class Dog extends Animal { ... }
> class Cat extends Animal { ... }
> new FilterFunction { ... }
>
> but I am wondering if there is anything that might surprise the unwary
> down that road, considering that the code base also uses asynchronous
> functions and the broadcast pattern.
>
> Thank you in advance,
> Davide Bolcioni
> --
> There is no place like /home
>


Spark Scala Contract Opportunity @USA

2022-11-10 Thread sri hari kali charan Tummala
Hi All,

Is anyone looking for a spark scala contract role inside the USA? A company
called Maxonic has an open spark scala contract position (100% remote)
inside the USA if anyone is interested, please send your CV to
kali.tumm...@gmail.com.

Thanks & Regards
Sri Tummala


Task Manager restart and RocksDB incremental checkpoints issue.

2022-11-10 Thread Vidya Sagar Mula
Hi,

I am using RocksDB state backend for incremental checkpointing with Flink
1.11 version.

Question:
--
For a given Job ID, Intermediate RocksDB checkpoints are stored under the
path defined with ""

The files are stored with "_jobID+ radom UUID" prefixed to the location.

Case : 1
-
- When I cancel the job, then all the rocksDB checkpoints are deleted
properly from the location corresponding to that JobId.
(based on "instanceBasePath" variable stored in RocksDBKeyedStateBackend
object).
"NO Issue here. Working as expected".

Case : 2
-
- When my TaskManger is restarted, the existing rocksDb checkpoints are not
deleted.
New "instanceBasePath" is constructed with the new Random UUID appended to
the directory.
And, old checkpoint directories are still there.

questions:
- Is this expected behaviour not to delete the existing checkPoint
dirs under the rocksDB local directory?
- I see the "StreamTaskStateInitializerImpl.java", where new StateBackend
objects are created. In this case, new directory is created for this Job ID
appended with new random UUID.
What happens to the old Directories. Are they going to be purged later on?
If not, the disk is going to filled up with the older checkpoints. Please
clarify this.

Thanks,
Vidya Sagar.


Re: [ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Ryan Skraba
Excellent news -- welcome to the new era of easier, more timely and more
feature-rich releases for everyone!

Great job!  Ryan

On Thu, Nov 10, 2022 at 3:15 PM Leonard Xu  wrote:

> Thanks Chesnay and Martijn for the great work!   I believe the
> flink-connector-shared-utils[1] you built will help Flink connector
> developers a lot.
>
>
> Best,
> Leonard
> [1] https://github.com/apache/flink-connector-shared-utils
>
> 2022年11月10日 下午9:53,Martijn Visser  写道:
>
> Really happy with the first externalized connector for Flink. Thanks a lot
> to all of you involved!
>
> On Thu, Nov 10, 2022 at 12:51 PM Chesnay Schepler 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink Elasticsearch Connector 3.0.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data
>> streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> This release marks the first time we have released a connector
>> separately from the main Flink release.
>> Over time more connectors will be migrated to this release model.
>>
>> This release is equivalent to the connector version released alongside
>> Flink 1.16.0 and acts as a drop-in replacement.
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12352291
>>
>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>>
>> Regards,
>> Chesnay
>>
>
>


Re: [ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Ryan Skraba via user
Excellent news -- welcome to the new era of easier, more timely and more
feature-rich releases for everyone!

Great job!  Ryan

On Thu, Nov 10, 2022 at 3:15 PM Leonard Xu  wrote:

> Thanks Chesnay and Martijn for the great work!   I believe the
> flink-connector-shared-utils[1] you built will help Flink connector
> developers a lot.
>
>
> Best,
> Leonard
> [1] https://github.com/apache/flink-connector-shared-utils
>
> 2022年11月10日 下午9:53,Martijn Visser  写道:
>
> Really happy with the first externalized connector for Flink. Thanks a lot
> to all of you involved!
>
> On Thu, Nov 10, 2022 at 12:51 PM Chesnay Schepler 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink Elasticsearch Connector 3.0.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data
>> streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> This release marks the first time we have released a connector
>> separately from the main Flink release.
>> Over time more connectors will be migrated to this release model.
>>
>> This release is equivalent to the connector version released alongside
>> Flink 1.16.0 and acts as a drop-in replacement.
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12352291
>>
>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>>
>> Regards,
>> Chesnay
>>
>
>


Re: [ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Leonard Xu
Thanks Chesnay and Martijn for the great work!   I believe the 
flink-connector-shared-utils[1] you built will help Flink connector developers 
a lot.


Best,
Leonard
[1] https://github.com/apache/flink-connector-shared-utils

> 2022年11月10日 下午9:53,Martijn Visser  写道:
> 
> Really happy with the first externalized connector for Flink. Thanks a lot to 
> all of you involved!
> 
> On Thu, Nov 10, 2022 at 12:51 PM Chesnay Schepler  > wrote:
> The Apache Flink community is very happy to announce the release of 
> Apache Flink Elasticsearch Connector 3.0.0.
> 
> Apache Flink® is an open-source stream processing framework for 
> distributed, high-performing, always-available, and accurate data 
> streaming applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
> 
> This release marks the first time we have released a connector 
> separately from the main Flink release.
> Over time more connectors will be migrated to this release model.
> 
> This release is equivalent to the connector version released alongside 
> Flink 1.16.0 and acts as a drop-in replacement.
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12352291 
> 
> 
> We would like to thank all contributors of the Apache Flink community 
> who made this release possible!
> 
> Regards,
> Chesnay



Re: [ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Leonard Xu
Thanks Chesnay and Martijn for the great work!   I believe the 
flink-connector-shared-utils[1] you built will help Flink connector developers 
a lot.


Best,
Leonard
[1] https://github.com/apache/flink-connector-shared-utils

> 2022年11月10日 下午9:53,Martijn Visser  写道:
> 
> Really happy with the first externalized connector for Flink. Thanks a lot to 
> all of you involved!
> 
> On Thu, Nov 10, 2022 at 12:51 PM Chesnay Schepler  > wrote:
> The Apache Flink community is very happy to announce the release of 
> Apache Flink Elasticsearch Connector 3.0.0.
> 
> Apache Flink® is an open-source stream processing framework for 
> distributed, high-performing, always-available, and accurate data 
> streaming applications.
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
> 
> This release marks the first time we have released a connector 
> separately from the main Flink release.
> Over time more connectors will be migrated to this release model.
> 
> This release is equivalent to the connector version released alongside 
> Flink 1.16.0 and acts as a drop-in replacement.
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12352291 
> 
> 
> We would like to thank all contributors of the Apache Flink community 
> who made this release possible!
> 
> Regards,
> Chesnay



Re: [ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Martijn Visser
Really happy with the first externalized connector for Flink. Thanks a lot
to all of you involved!

On Thu, Nov 10, 2022 at 12:51 PM Chesnay Schepler 
wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Elasticsearch Connector 3.0.0.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> This release marks the first time we have released a connector
> separately from the main Flink release.
> Over time more connectors will be migrated to this release model.
>
> This release is equivalent to the connector version released alongside
> Flink 1.16.0 and acts as a drop-in replacement.
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12352291
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay
>


Re: [ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Martijn Visser
Really happy with the first externalized connector for Flink. Thanks a lot
to all of you involved!

On Thu, Nov 10, 2022 at 12:51 PM Chesnay Schepler 
wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Elasticsearch Connector 3.0.0.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> This release marks the first time we have released a connector
> separately from the main Flink release.
> Over time more connectors will be migrated to this release model.
>
> This release is equivalent to the connector version released alongside
> Flink 1.16.0 and acts as a drop-in replacement.
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12352291
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay
>


Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-10 Thread Jing Ge
Hi Etienne,

Nice blog! Thanks for sharing!

Best regards,
Jing


On Wed, Nov 9, 2022 at 5:49 PM Etienne Chauchot 
wrote:

> Hi Yun Gao,
>
> FYI I just updated the article after your review:
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
> Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>
> Hi Etienne,
>
> Very thanks for the article! Flink is currently indeed keeping increasing
> the
> ability of unified batch / stream processing with the same api, and its a
> great
> pleasure that more and more users are trying this functionality. But I also
> have some questions regarding some details.
>
> First IMO, as a whole for the long run Flink will have two unified APIs,
> namely Table / SQL
> API and DataStream API. Users could express the computation logic with
> these two APIs
> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and jiras.
> And I also think IMHO that reducing the number of APIs to 2 was the good
> move.
>
>
> Underlying Flink provides two
> execution modes:  the streaming mode works with both bounded and unbounded
> data,
> and it executes in a way of incremental processing based on state; the
> batch mode works
> only with bounded data, and it executes in a ways level-by-level similar
> to the traditional
> batch processing frameworks. Users could switch the execution mode via
> EnvironmentSettings.inBatchMode() for
> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I though
> it would be more efficient on my bounded pipeline but as a matter of fact
> the streaming mode seems to be more efficient on my use case. I'll test
> with higher volumes to confirm.
>
>
>
> Specially for DataStream, as implemented in FLIP-140, currently all the
> existing DataStream
> operation supports the batch execution mode in a unified way[1]:  data
> will be sorted for the
> keyBy() edges according to the key, then the following operations like
> reduce() could receive
> all the data belonging to the same key consecutively, then it could
> directly reducing the records
> of the same key without maintaining the intermediate states. In this way
> users could write the
> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream pipeline
> will work with no modification if I plug an unbounded source to it.
>
>
>
> # Regarding the migration of Join / Reduce
>
> First I think Reduce is always supported and users could write
> dataStream.keyBy().reduce(xx)
> directly, and  if batch  execution mode is set, the reduce will not be
> executed in a incremental way,
> instead is acts much  like sort-based  aggregation in the traditional
> batch processing framework.
>
> Regarding Join, although the issue of FLINK-22587 indeed exists: current
> join has to be bound
> to a window and the GlobalWindow does not work properly, but with some
> more try currently
> it does not need users to  re-write the whole join from scratch: Users
> could write a dedicated
> window assigner that assigns all the  records to the same window instance
> and return
> EventTimeTrigger.create() as the default event-time trigger [2]. Then it
> works
>
> source1.join(source2)
> .where(a -> a.f0)
> .equalTo(b -> b.f0)
> .window(new EndOfStreamWindows())
> .apply();
>
> It does not requires records have event-time attached since the trigger of
> window is only
> relying on the time range of the window and the assignment does not need
> event-time either.
>
> The behavior of the join is also similar to sort-based join if batch mode
> is enabled.
>
> Of course it is not easy to use to let users do the workaround and we'll
> try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that I
> proposed. I tried it and it works perfectly with similar performance.
> Thanks.
>
>
> # Regarding support of Sort / Limit
>
> Currently these two operators are indeed not supported in the DataStream
> API directly. One initial
> though for these two operations are that users may convert the DataStream
> to Table API and use
> Table API for these two operators:
>
> DataStream xx = ... // Keeps the customized logic in DataStream
> Table tableXX = tableEnv.fromDataStream(dataStream);
> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have a
> special SQL based implementation (for comparison reasons) so I did not want
> to mix SQL and DataStream APIs in the same pipeline.
>
>
> How do you think about this option? We are also assessing if the
> combination of DataStream
> API / Table API is sufficient for all the batch 

[ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Chesnay Schepler
The Apache Flink community is very happy to announce the release of 
Apache Flink Elasticsearch Connector 3.0.0.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

This release marks the first time we have released a connector 
separately from the main Flink release.

Over time more connectors will be migrated to this release model.

This release is equivalent to the connector version released alongside 
Flink 1.16.0 and acts as a drop-in replacement.


The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352291

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay


[ACCOUNCE] Apache Flink Elasticsearch Connector 3.0.0 released

2022-11-10 Thread Chesnay Schepler
The Apache Flink community is very happy to announce the release of 
Apache Flink Elasticsearch Connector 3.0.0.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

This release marks the first time we have released a connector 
separately from the main Flink release.

Over time more connectors will be migrated to this release model.

This release is equivalent to the connector version released alongside 
Flink 1.16.0 and acts as a drop-in replacement.


The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352291

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay