Re: flink任务挂掉后自动重启

2020-10-29 文章 Congxian Qiu
Hi
1 Flink 的 RestartStrategy[1] 可以解决你的问题吗?
2 从 checkpoint 恢复 这个,可以尝试记录每个作业最新的 checkpoint 地址,也可以在启动的时候从 hdfs 获取一下

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/task_failure_recovery.html
Best,
Congxian


bradyMk  于2020年10月30日周五 上午11:51写道:

>
> flink任务一般都是7*24h在跑的,如果挂掉,有没有什么办法自动重启任务?之前都是任务挂掉然后手动再提交一次任务,但是不可能每次挂掉都可以手动重启;另外,如果对于没做checkpoints的任务,可以通过定时脚本监控yarn,如果任务不存在,则重新提交任务,但是,对于做了checkpoints的任务,我们提交的时候就需要指定ck的目录,这个目录都是在变的,那么又该如何让任务挂掉后能自动重启呢?希望能得到大佬们的指点~
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink cep超时事件的问题

2020-10-29 文章 sunfulin



hi,
session window能处理这种超时事件么?不知道有没有例子可以参考参考哈。














在 2020-10-30 11:12:55,"naisili Yuan"  写道:
> 不知道理解错没有, 感觉你这个场景使用session windows能解决
>
>sunfulin  于2020年10月30日周五 上午11:01写道:
>
>> hi,community,
>> 我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink
>> cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
>> 目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
>> 感谢。


flink任务挂掉后自动重启

2020-10-29 文章 bradyMk
flink任务一般都是7*24h在跑的,如果挂掉,有没有什么办法自动重启任务?之前都是任务挂掉然后手动再提交一次任务,但是不可能每次挂掉都可以手动重启;另外,如果对于没做checkpoints的任务,可以通过定时脚本监控yarn,如果任务不存在,则重新提交任务,但是,对于做了checkpoints的任务,我们提交的时候就需要指定ck的目录,这个目录都是在变的,那么又该如何让任务挂掉后能自动重启呢?希望能得到大佬们的指点~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

minibatch+????ttl??????????????????????

2020-10-29 文章 ????????
hi, all !
flink??1.9??
??select userId,sum(money) as result,ymd from (
select userId,order_id,money,DATE_FORMAT(trans_time,'MMdd') as 
ymd,row_number() over(partition by order_id order by last_modify_time desc) as 
rk from MyTable where type='1'
) t where t.rk = 1 group by userId,ymd;
??tableConfig.setIdleStateRetentionTime(Time.milliseconds(360), 
Time.milliseconds(39)); --1??
??checkpointttl
jira??1??


??1??https://issues.apache.org/jira/browse/FLINK-17096

Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
我把operator chain和streaming
dataflow的概念弄混了,不好意思。我想表达的是在整个任务流程中,选择shuffle的位置对于性能的影响。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 Natasha
Hi Admin,
你说的没错,我错误地import了scala的DataStream,问题已解决!感谢!






Best,
Nat
在2020年10月30日 11:06,admin<17626017...@163.com> 写道:
Hi,
怀疑你import了scala的包,把import部分也贴出来看看呢

2020年10月30日 上午10:19,Natasha <13631230...@163.com> 写道:

Hi,社区~
我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。

Best,
Nat



flink实时流中如何实时获取当前时间

2020-10-29 文章 zjfpla...@hotmail.com
RT,sql方式中怎么实时取当前消息处理的时间,来插入数据库当作当前更新时间?现在用current_timestamp发现不会变,只是第一次的时间



zjfpla...@hotmail.com


Re: flink cep超时事件的问题

2020-10-29 文章 naisili Yuan
 不知道理解错没有, 感觉你这个场景使用session windows能解决

sunfulin  于2020年10月30日周五 上午11:01写道:

> hi,community,
> 我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink
> cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
> 目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
> 感谢。


Re: Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 admin
Hi,
怀疑你import了scala的包,把import部分也贴出来看看呢

> 2020年10月30日 上午10:19,Natasha <13631230...@163.com> 写道:
> 
> Hi,社区~
> 
> 我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。
> 
> Best,
> Nat



flink cep超时事件的问题

2020-10-29 文章 sunfulin
hi,community,
我最近有一个业务场景,需要基于消息流和具体的业务逻辑判断生成超时事件,考虑通过flink 
cep来实现。不过在这个场景中,需要针对输入的消息,判断如果一个小时内没有匹配的数据到来,就需要把该事件输出。
目前的cep机制,应该需要下一个事件消息到来时才会输出事件。想请教下各位大神,针对这个诉求有没有啥好的方案。
感谢。

Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 admin
HI,
operator chain的作用不就是避免shuffle,减少网络间的传输吗?你为什么要手动shuffle呢?

> 2020年10月30日 上午10:24,Husky Zeng <568793...@qq.com> 写道:
> 
> 补充一个细节:
> 
> 
> 当我把shuffle加到cal和sort中间时,
> 
> source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink
> 
> shuffle的数据传输IO速度是3G/s,需要传输的文件大小是370G。
> 
> 当我把shuffle加到SinkConversionToRow和sink中间时,
> 
> source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink
> 
> shuffle的数据传输IO速度是0.1G/s,需要传输的文件大小是250G。
> 
> 
> 文件大小也是有区别的。
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
补充一个细节:


当我把shuffle加到cal和sort中间时,

source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink

shuffle的数据传输IO速度是3G/s,需要传输的文件大小是370G。

当我把shuffle加到SinkConversionToRow和sink中间时,

source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink

shuffle的数据传输IO速度是0.1G/s,需要传输的文件大小是250G。


文件大小也是有区别的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 文章 Husky Zeng
Hi all,

在使用flink的shuffle功能时,我发现在operator chain中不同的位置进行shuffle,IO速度有非常明显的差距。

比如我的这个例子:

source-->cal--->sort--->SinkConversionToRow--->sink

从hive读数据,计算,排序,转化为外部类型行,写入hive。

当我把shuffle加到cal和sort中间时,

source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink

shuffle的数据传输IO速度是3G/s

当我把shuffle加到SinkConversionToRow和sink中间时,

source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink

shuffle的数据传输IO速度是0.1G/s


足足差了30倍!


我猜测这是由于SinkConversionToRow将数据转化为了外部格式,外部格式传输速度慢,内部格式传输速度快。

但是为什么差距这么大?  内部格式如何做到传输速度这么快,外部格式又为什么传输速度这么慢?

SinkConversionToRow代码位置:
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink#translateToTransformation



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 hailongwang
Hi Natasha,
没看到你上传的附件图呢,重新贴下不?


Best,
Hailong Wang




在 2020-10-29 16:52:00,"Natasha" <13631230...@163.com> 写道:



hi,社区~

我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题(附件图),这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激


谢谢!

??????JM??????????????????

2020-10-29 文章 void
hi all
   flinkdatasetapi 
??10jm??, 
jmcli ?? rest 


Flink程序连接Kafka类型不匹配问题

2020-10-29 文章 Natasha


hi,社区~

我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题(附件图),这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激


谢谢!

Re: Checkpoint size的问题

2020-10-29 文章 Yun Tang
Hi

web UI显示的是增量上传数据量,包括各个task上传的数据,而_metadata 
只是一个元数据,是由JM上传的,所以不能将_metadata与checkpoint UI显示的数据量划等号。

祝好
唐云

From: gsralex 
Sent: Wednesday, October 28, 2020 19:17
To: user-zh@flink.apache.org 
Subject: Checkpoint size的问题

Hi, All
Checkpoint 一般Web UI显示的是400MB左右,但是查看HDFS实际的大小,不到1MB(_metadata) 
,想问下这之间size的偏差为什么这么大?


Re: JDBC 并发写入量大时挂掉

2020-10-29 文章 Jark Wu
LEFT JOIN 是会有 delete (retraction)发生的。

On Thu, 29 Oct 2020 at 16:36, LittleFall <1578166...@qq.com> wrote:

> 操作中没有 DELETE 语句也会导致这个问题吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JDBC 并发写入量大时挂掉

2020-10-29 文章 LittleFall
操作中没有 DELETE 语句也会导致这个问题吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: JDBC 并发写入量大时挂掉

2020-10-29 文章 Jark Wu
看起来是这个bug,已经在1.11.3上修复,你可以自己 build 下 release-1.11 分支。
https://issues.apache.org/jira/browse/FLINK-19423

Best,
Jark

On Thu, 29 Oct 2020 at 16:18, LittleFall <1578166...@qq.com> wrote:

> 测试发了10个线程,每个线程1000次,一共1万条记录
>
> 会在写入几千条的时候挂掉
>
> 2020-10-29 12:04:55,573 WARN  org.apache.flink.runtime.taskmanager.Task
>
> [] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
> PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE
> RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
> CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
> CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
> PAYER_CUSTOMER_REQUEST_NO, OPE  RATOR_NAME, CARD_HOLDER_NAME, ID1,
> CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
> MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,
> ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
> PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
> pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
> external_no, STA  TUS0 AS pay_status, STATUS AS order_status,
> PAY_AMOUNT
> AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
> GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS
> productdesc,
> GOODS_DESC AS goods_des  c, CUSTOMER_BIZ_REQUEST_NO AS
> customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
> EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
> CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato  r,
> CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
> AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS
> payerupdatetime,
> CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS
>
> create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
> coupon_amount]) -> Sink:
> Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
> op_id, order_date, complete_date, payer_customer_request_no, pay_t
> ime,
> customer_request_no, external_no, pay_status, order_status, pay_amount,
> payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
> customer_biz_request_no, goods_ext_info, memo, extend_info,
> channel_ret_code, c  hannel_ret_msg, operator, customer_no,
> customer_name, extend, payercreatetime, payerupdatetime, card_no,
> card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
> (14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING   to FAILED.
> java.io.IOException: Writing records to JDBC failed.
> at
>
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
> ~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>   

JDBC 并发写入量大时挂掉

2020-10-29 文章 LittleFall
测试发了10个线程,每个线程1000次,一共1万条记录

会在写入几千条的时候挂掉

2020-10-29 12:04:55,573 WARN  org.apache.flink.runtime.taskmanager.Task 
  
[] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE 
RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
PAYER_CUSTOMER_REQUEST_NO, OPE  RATOR_NAME, CARD_HOLDER_NAME, ID1,
CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,  
ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
external_no, STA  TUS0 AS pay_status, STATUS AS order_status, PAY_AMOUNT
AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS productdesc,
GOODS_DESC AS goods_des  c, CUSTOMER_BIZ_REQUEST_NO AS
customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato  r,
CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS payerupdatetime,
CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS  
create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
coupon_amount]) -> Sink:
Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
op_id, order_date, complete_date, payer_customer_request_no, pay_t  ime,
customer_request_no, external_no, pay_status, order_status, pay_amount,
payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
customer_biz_request_no, goods_ext_info, memo, extend_info,
channel_ret_code, c  hannel_ret_msg, operator, customer_no,
customer_name, extend, payercreatetime, payerupdatetime, card_no,
card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
(14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING   to FAILED.
java.io.IOException: Writing records to JDBC failed.
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.output(StreamingJoinOperator.java:305)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at

Re: flink1.11 elasticsearch connector

2020-10-29 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 29, 2020 at 3:37 PM 赵帅  wrote:
>
> elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?


Flink消费LDAP Kafka

2020-10-29 文章 hua mulan
Flink kafka connector可以消费开了LDAP的Kafka吗

来自 Outlook


flink1.11 elasticsearch connector

2020-10-29 文章 赵帅
elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?

Re: flink1.11 kafka connector

2020-10-29 文章 Jark Wu
多谢创建 issue。

side comment: 1.12 中 kafka connector 将支持声明 message key 部分,当声明了 message key
部分,就自动会按照 key 来做 hash 到某个固定分区。

Best,
Jark

On Thu, 29 Oct 2020 at 14:27, Dream-底限  wrote:

> hi、
> 好的,https://issues.apache.org/jira/browse/FLINK-19871
>
> Jark Wu  于2020年10月29日周四 下午12:06写道:
>
> > 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
> >
> > > hi、
> > > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> > >
> > >- fixed:每个Flink分区最多只能有一个Kafka分区。
> > >- round-robin:Flink分区循环分配给Kafka分区。
> > >
> >
>


Re: flink1.11 kafka connector

2020-10-29 文章 Dream-底限
hi、
好的,https://issues.apache.org/jira/browse/FLINK-19871

Jark Wu  于2020年10月29日周四 下午12:06写道:

> 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
>
> Best,
> Jark
>
>
> On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
>
> > hi、
> > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> >
> >- fixed:每个Flink分区最多只能有一个Kafka分区。
> >- round-robin:Flink分区循环分配给Kafka分区。
> >
>