[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-16 Thread casel.chen
作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner 
join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink 
Function的invoke方法打的日志),该行为导致最终结果表数据不正确。


请问:
flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
我理解flink 
sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。



Re: Imbalance in Garbage Collection for TaskManagers

2023-02-16 Thread Meghajit Mazumdar
Hi Weihua,Shammon

> What kind of session cluster are you using? Standalone or Native?
We are running a native deployment on Kubernetes. Is there any optimization
we can do ?

> Maybe you need to dump memory and analyze the usage if there are no other
obvious problems
I was thinking of enabling task manager memory logs, like what is mentioned
here [1].
However, I wanted to know if there is going to be any repercussions on the
throughput of the job by enabling these ? Does it cause a large overhead on
the CPU and/or memory ?


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/#analyzing-memory--garbage-collection-behaviour

Regards,
Meghajit

On Fri, Feb 17, 2023 at 8:10 AM Weihua Hu  wrote:

> Hi, Meghajit
>
> What kind of session cluster are you using? Standalone or Native?
> If it's standalone, maybe you can check if TaskManager with heavy gc is
> running more tasks than others. If so, we can enable
> "cluster.evenly-spread-out-slots=true" to balance tasks in all task
> managers.
>
> Best,
> Weihua
>
>
> On Thu, Feb 16, 2023 at 10:52 PM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hello,
>>
>> We have a Flink session cluster deployment in Kubernetes of around 100
>> TaskManagers. It processes around 20-30 Kafka Source jobs at the moment.
>> The jobs run are all using the same jar and only differ in the SQL query
>> used and other UDFs. We are using the official flink:1.14.3 image.
>>
>> We observed that one specific task manager has been doing more garbage
>> collection compared to the others, So much actually, that at a specific
>> hour of the day, it pauses execution to do GC and thus causes huge consumer
>> lag to build up. By garbage collection, I mean GC of the Young Generation.
>> The old generation GC looks fine.
>>
>> We checked this in our other running Flink clusters and found that
>> actually in most of them, this behaviour is being seen. In fact, there are
>> always 2-3 TaskManagers which seem to be doing more GC than the others.
>>
>> Is this a known issue ? Our clusters run long running kafka source to
>> kafka sink jobs, so wanted to know if this can happen because of  that.
>>
>> Would appreciate any kind of guidance.
>> --
>> *Regards,*
>> *Meghajit*
>>
>

-- 
*Regards,*
*Meghajit*


Re: KafkaSink handling message size produce errors

2023-02-16 Thread Shammon FY
Hi jing,

It sounds good to me, we can add an option for it

Best,
Shammon


On Fri, Feb 17, 2023 at 3:13 PM Jing Ge  wrote:

> Hi,
>
> It makes sense to offer this feature of catching and ignoring exp with
> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
> ticket if most of you consider it as a good feature to help users.
>
> Best regards,
> Jing
>
> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:
>
>> Hi Hatem
>>
>> As mentioned above, you can extend the KafkaSink or create a udf and
>> process the record before sink
>>
>> Best,
>> Shammon
>>
>> On Fri, Feb 17, 2023 at 9:54 AM yuxia 
>> wrote:
>>
>>> Hi, Hatem.
>>> I think there is no way to catch the exception and then ignore it in
>>> current implementation for KafkaSink.  You may also need to extend the
>>> KafkaSink.
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> --
>>> *发件人: *"Hatem Mostafa" 
>>> *收件人: *"User" 
>>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>>> *主题: *KafkaSink handling message size produce errors
>>>
>>> Hello,
>>> I am writing a flink job that reads and writes into kafka, it is using a
>>> window operator and eventually writing the result of the window into a
>>> kafka topic. The accumulated data can exceed the maximum message size after
>>> compression on the producer level. I want to be able to catch the exception
>>> coming from the producer and ignore this window. I could not find a way to
>>> do that in KafkaSink
>>> ,
>>> is there a way to do so?
>>>
>>> I attached here an example of an error that I would like to handle
>>> gracefully.
>>>
>>> [image: image.png]
>>>
>>>
>>> This question is similar to one that was asked on stackoverflow here
>>> 
>>>  but
>>> the answer is relevant for older versions of flink.
>>>
>>> Regards,
>>> Hatem
>>>
>>>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread Jing Ge via user
Hi,

It makes sense to offer this feature of catching and ignoring exp with
config on/off, when we put ourselves in users' shoes. WDYT? I will create a
ticket if most of you consider it as a good feature to help users.

Best regards,
Jing

On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:

> Hi Hatem
>
> As mentioned above, you can extend the KafkaSink or create a udf and
> process the record before sink
>
> Best,
> Shammon
>
> On Fri, Feb 17, 2023 at 9:54 AM yuxia  wrote:
>
>> Hi, Hatem.
>> I think there is no way to catch the exception and then ignore it in
>> current implementation for KafkaSink.  You may also need to extend the
>> KafkaSink.
>>
>> Best regards,
>> Yuxia
>>
>> --
>> *发件人: *"Hatem Mostafa" 
>> *收件人: *"User" 
>> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
>> *主题: *KafkaSink handling message size produce errors
>>
>> Hello,
>> I am writing a flink job that reads and writes into kafka, it is using a
>> window operator and eventually writing the result of the window into a
>> kafka topic. The accumulated data can exceed the maximum message size after
>> compression on the producer level. I want to be able to catch the exception
>> coming from the producer and ignore this window. I could not find a way to
>> do that in KafkaSink
>> ,
>> is there a way to do so?
>>
>> I attached here an example of an error that I would like to handle
>> gracefully.
>>
>> [image: image.png]
>>
>>
>> This question is similar to one that was asked on stackoverflow here
>> 
>>  but
>> the answer is relevant for older versions of flink.
>>
>> Regards,
>> Hatem
>>
>>


Re: DataStream Join produces no output and causes program crash

2023-02-16 Thread Shuiqiang Chen
Hi Reme,

The code you provided seems good to me. Maybe you can add some logs in the
getKey() and join() function for debug purpose to observe whether there was
any successfully joined record. By the way, the metrics in WebUI dashboard
might be of good help.

Best,
Shuiqiang

Reme Ajayi  于2023年2月16日周四 22:20写道:

> Hi,
> I am trying to join two Kafka Data Streams from and output to another
> Kafka topic, however my joined stream does not output any data.  After some
> time, my program crashes and runs out of memory, which I think is a result
> of the join not working. My code doesn't throw any errors, but the joins
> don't produce any output. My join logic is below, please suggest possible
> solutions.P.S:
> Things I have tried so far:
>
>1. Increased task slots on the task manager
>2. Added Watermarks to my Kafka sources
>
>  DataStream joinedStream = EntriesStream.join(historyStream)
>   .where(new KeySelector() 
> {
>
>   @Override
>   public String getKey(GenericRecord 
> value) throws Exception {
>   return 
> value.get("la_id").toString();
>
>   }
>   }).equalTo(new KeySelector String>() {
>
>   @Override
>   public String getKey(GenericRecord 
> value) throws Exception {
>   return 
> value.get("id").toString();
>   }
>   
> }).window(TumblingEventTimeWindows.of(Time.seconds(30)))
>   .apply(new JoinFunction GenericRecord, Enhanced>() {
>
>
>   @Override
>   public Enhanced join(GenericRecord 
> first, GenericRecord second) throws Exception {
>   return new Enhanced(
>   
> Long.parseLong(first.get("c_at").toString()),
>   
> first.get("c_type").toString(),
>   
> first.get("id").toString(),
>   
> Integer.parseInt(first.get("d_cts").toString()),
>   
> Integer.parseInt(first.get("c_cts").toString()),
>   
> second.get("prov").toString(),
>   
> second.get("bb_S_T").toString(),
>   
> second.get("p_id").toString(),
>   
> second.get("s_ccurr").toString()
>   );
>   }
>   });
>
>


Re: Imbalance in Garbage Collection for TaskManagers

2023-02-16 Thread Shammon FY
Hi

Maybe you need to dump memory and analyze the usage if there are no other
obvious problems

Best,
Shammon

On Fri, Feb 17, 2023 at 10:41 AM Weihua Hu  wrote:

> Hi, Meghajit
>
> What kind of session cluster are you using? Standalone or Native?
> If it's standalone, maybe you can check if TaskManager with heavy gc is
> running more tasks than others. If so, we can enable
> "cluster.evenly-spread-out-slots=true" to balance tasks in all task
> managers.
>
> Best,
> Weihua
>
>
> On Thu, Feb 16, 2023 at 10:52 PM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hello,
>>
>> We have a Flink session cluster deployment in Kubernetes of around 100
>> TaskManagers. It processes around 20-30 Kafka Source jobs at the moment.
>> The jobs run are all using the same jar and only differ in the SQL query
>> used and other UDFs. We are using the official flink:1.14.3 image.
>>
>> We observed that one specific task manager has been doing more garbage
>> collection compared to the others, So much actually, that at a specific
>> hour of the day, it pauses execution to do GC and thus causes huge consumer
>> lag to build up. By garbage collection, I mean GC of the Young Generation.
>> The old generation GC looks fine.
>>
>> We checked this in our other running Flink clusters and found that
>> actually in most of them, this behaviour is being seen. In fact, there are
>> always 2-3 TaskManagers which seem to be doing more GC than the others.
>>
>> Is this a known issue ? Our clusters run long running kafka source to
>> kafka sink jobs, so wanted to know if this can happen because of  that.
>>
>> Would appreciate any kind of guidance.
>> --
>> *Regards,*
>> *Meghajit*
>>
>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread Shammon FY
Hi Hatem

As mentioned above, you can extend the KafkaSink or create a udf and
process the record before sink

Best,
Shammon

On Fri, Feb 17, 2023 at 9:54 AM yuxia  wrote:

> Hi, Hatem.
> I think there is no way to catch the exception and then ignore it in
> current implementation for KafkaSink.  You may also need to extend the
> KafkaSink.
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Hatem Mostafa" 
> *收件人: *"User" 
> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
> *主题: *KafkaSink handling message size produce errors
>
> Hello,
> I am writing a flink job that reads and writes into kafka, it is using a
> window operator and eventually writing the result of the window into a
> kafka topic. The accumulated data can exceed the maximum message size after
> compression on the producer level. I want to be able to catch the exception
> coming from the producer and ignore this window. I could not find a way to
> do that in KafkaSink
> ,
> is there a way to do so?
>
> I attached here an example of an error that I would like to handle
> gracefully.
>
> [image: image.png]
>
>
> This question is similar to one that was asked on stackoverflow here
> 
>  but
> the answer is relevant for older versions of flink.
>
> Regards,
> Hatem
>
>


Re: Disable the chain of the Sink operator

2023-02-16 Thread wudi
Thank you for your reply
But in my local test environment (flink1.15 and flink1.16), when the chain of 
writer and commiter is disabled, the back pressure can be reduced.

The specific phenomenon is as follows:
1. After ck-4 is completed, the commit execution is very slow
2. At this time, the [Sink: Writer (1/1)#0] thread will continue to call the 
SinkWriter.write() method to receive upstream data.
3. After triggering ck-5, the prepareCommit and snapshotState methods will be 
executed
4. Because the last commit has not been completed, the [Sink: Committer 
(1/1)#0] thread will wait to call the commit method.
5. SinkWriter.write() can still continue to receive upstream data
6. ck-6 will not be triggered, and ck-6 will be triggered after the first 
commit is completed
The whole process will not block the method of SinkWriter.write().

However, if the chain is not disabled, receiving upstream data will be blocked 
in the second step.


Thanks && Regards,
di.wu

> 2023年2月17日 上午9:40,Shammon FY  写道:
> 
> Hi wudi
> 
> I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
> chained together, and the Commiter task runs slowly, it can block the
> upstream Writer tasks by back pressure too.
> 
> On the other hand, you can try to increase the parallelism of sink node to
> speedup the Commiter operation
> 
> Best,
> Shammon
> 
> On Thu, Feb 16, 2023 at 11:38 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> thanks for your replies.
>> I think that if Writer and Commiter are not chained together, data
>> consistency can be guaranteed, right?
>> Because when the Commiter does not block the Writer, at the next
>> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
>> not be triggered
>> 
>> In addition, in this scenario (writer blocking caused by slow commit), may
>> the performance of disabling Sink's chain be better? Because it can reduce
>> a lot of back pressure.
>> 
>> Thanks && Regards,
>> di.wu
>> 
>> 
>>> 2023年2月16日 下午10:05,Chesnay Schepler  写道:
>>> 
>>> As far as I know that chain between committer and writer is also
>> required for correctness.
>>> 
>>> On 16/02/2023 14:53, weijie guo wrote:
 Hi wu,
 
 I don't think it is a good choice to directly change the strategy of
>> chain. Operator chain usually has better performance and resource
>> utilization. If we directly change the chain policy between them, users can
>> no longer chain them together, which is not a good starting point.
 
 Best regards,
 
 Weijie
 
 
 
 wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
 
   Thank you for your reply.
 
   Currently in the custom Sink Connector, the Flink task will
   combine Writer and Committer into one thread, and the thread name
   is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
   In this way, when the *Committer.commit()* method is very slow, it
   will block the*SinkWriter.write()* method to receive upstream data.
 
   The client can use the *env.disableOperatorChaining() *method to
   split the thread into two threads:*[Sink: Writer (1/1)#0] *and
   *[Sink: Committer (1/1)#0]*. This Committer. The commit method
   will not block the SinkWriter.write method.
 
   If the chain policy can be disabled in the custom Sink Connector,
   the client can be prevented from setting and disabling the chain.
   Or is there a better way to make*Committer.commit()* not block
   *SinkWriter.write()*?
 
   Looking forward for your reply.
   Thanks && Regards,
   di.wu
 
>   2023年2月16日 下午6:54,Shammon FY  写道:
> 
>   Hi
> 
>   Do you mean how to disable `chain` in your custom sink
>   connector?  Can you
>   give an example of what you want?
> 
>   On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> 
>>   Hello
>> 
>>   The current Sink operator will be split into two operations,
>>   Writer and
>>   Commiter. By default, they will be chained together and executed
>>   on the
>>   same thread.
>>   So sometimes when the commiter is very slow, it will block the data
>>   writer, causing back pressure.
>> 
>>   At present, FlinkSQL can be solved by disabling the chain
>>   globally, and
>>   DataStream can partially disable the chain through the
>>   disableChaining
>>   method, but both of them need to be set by the user.
>> 
>>   Can the strategy of the Chain be changed in the Custom Sink
>>   Connector to
>>   separate Writer and Commiter?
>> 
>>   Thanks && Regards,
>>   di.wu
>> 
> 
 
>>> 
>> 
>> 
> 



Re: Disable the chain of the Sink operator

2023-02-16 Thread wudi
Thank you for your reply
But in my local test environment (flink1.15 and flink1.16), when the chain of 
writer and commiter is disabled, the back pressure can be reduced.

The specific phenomenon is as follows:
1. After ck-4 is completed, the commit execution is very slow
2. At this time, the [Sink: Writer (1/1)#0] thread will continue to call the 
SinkWriter.write() method to receive upstream data.
3. After triggering ck-5, the prepareCommit and snapshotState methods will be 
executed
4. Because the last commit has not been completed, the [Sink: Committer 
(1/1)#0] thread will wait to call the commit method.
5. SinkWriter.write() can still continue to receive upstream data
6. ck-6 will not be triggered, and ck-6 will be triggered after the first 
commit is completed
The whole process will not block the method of SinkWriter.write().

However, if the chain is not disabled, receiving upstream data will be blocked 
in the second step.


Thanks && Regards,
di.wu

> 2023年2月17日 上午9:40,Shammon FY  写道:
> 
> Hi wudi
> 
> I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
> chained together, and the Commiter task runs slowly, it can block the
> upstream Writer tasks by back pressure too.
> 
> On the other hand, you can try to increase the parallelism of sink node to
> speedup the Commiter operation
> 
> Best,
> Shammon
> 
> On Thu, Feb 16, 2023 at 11:38 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> thanks for your replies.
>> I think that if Writer and Commiter are not chained together, data
>> consistency can be guaranteed, right?
>> Because when the Commiter does not block the Writer, at the next
>> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
>> not be triggered
>> 
>> In addition, in this scenario (writer blocking caused by slow commit), may
>> the performance of disabling Sink's chain be better? Because it can reduce
>> a lot of back pressure.
>> 
>> Thanks && Regards,
>> di.wu
>> 
>> 
>>> 2023年2月16日 下午10:05,Chesnay Schepler  写道:
>>> 
>>> As far as I know that chain between committer and writer is also
>> required for correctness.
>>> 
>>> On 16/02/2023 14:53, weijie guo wrote:
 Hi wu,
 
 I don't think it is a good choice to directly change the strategy of
>> chain. Operator chain usually has better performance and resource
>> utilization. If we directly change the chain policy between them, users can
>> no longer chain them together, which is not a good starting point.
 
 Best regards,
 
 Weijie
 
 
 
 wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
 
   Thank you for your reply.
 
   Currently in the custom Sink Connector, the Flink task will
   combine Writer and Committer into one thread, and the thread name
   is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
   In this way, when the *Committer.commit()* method is very slow, it
   will block the*SinkWriter.write()* method to receive upstream data.
 
   The client can use the *env.disableOperatorChaining() *method to
   split the thread into two threads:*[Sink: Writer (1/1)#0] *and
   *[Sink: Committer (1/1)#0]*. This Committer. The commit method
   will not block the SinkWriter.write method.
 
   If the chain policy can be disabled in the custom Sink Connector,
   the client can be prevented from setting and disabling the chain.
   Or is there a better way to make*Committer.commit()* not block
   *SinkWriter.write()*?
 
   Looking forward for your reply.
   Thanks && Regards,
   di.wu
 
>   2023年2月16日 下午6:54,Shammon FY  写道:
> 
>   Hi
> 
>   Do you mean how to disable `chain` in your custom sink
>   connector?  Can you
>   give an example of what you want?
> 
>   On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> 
>>   Hello
>> 
>>   The current Sink operator will be split into two operations,
>>   Writer and
>>   Commiter. By default, they will be chained together and executed
>>   on the
>>   same thread.
>>   So sometimes when the commiter is very slow, it will block the data
>>   writer, causing back pressure.
>> 
>>   At present, FlinkSQL can be solved by disabling the chain
>>   globally, and
>>   DataStream can partially disable the chain through the
>>   disableChaining
>>   method, but both of them need to be set by the user.
>> 
>>   Can the strategy of the Chain be changed in the Custom Sink
>>   Connector to
>>   separate Writer and Commiter?
>> 
>>   Thanks && Regards,
>>   di.wu
>> 
> 
 
>>> 
>> 
>> 
> 



Re: Imbalance in Garbage Collection for TaskManagers

2023-02-16 Thread Weihua Hu
Hi, Meghajit

What kind of session cluster are you using? Standalone or Native?
If it's standalone, maybe you can check if TaskManager with heavy gc is
running more tasks than others. If so, we can enable
"cluster.evenly-spread-out-slots=true" to balance tasks in all task
managers.

Best,
Weihua


On Thu, Feb 16, 2023 at 10:52 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We have a Flink session cluster deployment in Kubernetes of around 100
> TaskManagers. It processes around 20-30 Kafka Source jobs at the moment.
> The jobs run are all using the same jar and only differ in the SQL query
> used and other UDFs. We are using the official flink:1.14.3 image.
>
> We observed that one specific task manager has been doing more garbage
> collection compared to the others, So much actually, that at a specific
> hour of the day, it pauses execution to do GC and thus causes huge consumer
> lag to build up. By garbage collection, I mean GC of the Young Generation.
> The old generation GC looks fine.
>
> We checked this in our other running Flink clusters and found that
> actually in most of them, this behaviour is being seen. In fact, there are
> always 2-3 TaskManagers which seem to be doing more GC than the others.
>
> Is this a known issue ? Our clusters run long running kafka source to
> kafka sink jobs, so wanted to know if this can happen because of  that.
>
> Would appreciate any kind of guidance.
> --
> *Regards,*
> *Meghajit*
>


Re: KafkaSink handling message size produce errors

2023-02-16 Thread yuxia
Hi, Hatem. 
I think there is no way to catch the exception and then ignore it in current 
implementation for KafkaSink. You may also need to extend the KafkaSink. 

Best regards, 
Yuxia 


发件人: "Hatem Mostafa"  
收件人: "User"  
发送时间: 星期四, 2023年 2 月 16日 下午 9:32:44 
主题: KafkaSink handling message size produce errors 

Hello, 
I am writing a flink job that reads and writes into kafka, it is using a window 
operator and eventually writing the result of the window into a kafka topic. 
The accumulated data can exceed the maximum message size after compression on 
the producer level. I want to be able to catch the exception coming from the 
producer and ignore this window. I could not find a way to do that in [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
 | KafkaSink ] , is there a way to do so? 

I attached here an example of an error that I would like to handle gracefully. 




This question is similar to one that was asked on stackoverflow [ 
https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink
 | here ] but the answer is relevant for older versions of flink. 

Regards, 
Hatem 



Table API: Converting ChangelogMode.all() Table to ChangelogMode.insertOnly()

2023-02-16 Thread Yaroslav Tkachenko
Hi everyone,

In my Flink application, I have a table created with ChangelogMode.all().

One of the sinks I want to use requires ChangelogMode.insertOnly().

The only solution that comes to mind is converting my table to a DataStream
of Rows, filtering out using RowKind and converting it back to a table. It
seems pretty inefficient.

I know it's currently not possible to access RowKind from a Table API, but
I'm wondering if there is another, more efficient approach here?

Thank you.


Re: Disable the chain of the Sink operator

2023-02-16 Thread Shammon FY
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <676366...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler  写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>Thank you for your reply.
> >>
> >>Currently in the custom Sink Connector, the Flink task will
> >>combine Writer and Committer into one thread, and the thread name
> >>is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>In this way, when the *Committer.commit()* method is very slow, it
> >>will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>The client can use the *env.disableOperatorChaining() *method to
> >>split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>*[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>will not block the SinkWriter.write method.
> >>
> >>If the chain policy can be disabled in the custom Sink Connector,
> >>the client can be prevented from setting and disabling the chain.
> >>Or is there a better way to make*Committer.commit()* not block
> >>*SinkWriter.write()*?
> >>
> >>Looking forward for your reply.
> >>Thanks && Regards,
> >>di.wu
> >>
> >>>2023年2月16日 下午6:54,Shammon FY  写道:
> >>>
> >>>Hi
> >>>
> >>>Do you mean how to disable `chain` in your custom sink
> >>>connector?  Can you
> >>>give an example of what you want?
> >>>
> >>>On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> >>>
> Hello
> 
> The current Sink operator will be split into two operations,
> Writer and
> Commiter. By default, they will be chained together and executed
> on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
> 
> At present, FlinkSQL can be solved by disabling the chain
> globally, and
> DataStream can partially disable the chain through the
> disableChaining
> method, but both of them need to be set by the user.
> 
> Can the strategy of the Chain be changed in the Custom Sink
> Connector to
> separate Writer and Commiter?
> 
> Thanks && Regards,
> di.wu
> 
> >>>
> >>
> >
>
>


Re: Disable the chain of the Sink operator

2023-02-16 Thread Shammon FY
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <676366...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler  写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>Thank you for your reply.
> >>
> >>Currently in the custom Sink Connector, the Flink task will
> >>combine Writer and Committer into one thread, and the thread name
> >>is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>In this way, when the *Committer.commit()* method is very slow, it
> >>will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>The client can use the *env.disableOperatorChaining() *method to
> >>split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>*[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>will not block the SinkWriter.write method.
> >>
> >>If the chain policy can be disabled in the custom Sink Connector,
> >>the client can be prevented from setting and disabling the chain.
> >>Or is there a better way to make*Committer.commit()* not block
> >>*SinkWriter.write()*?
> >>
> >>Looking forward for your reply.
> >>Thanks && Regards,
> >>di.wu
> >>
> >>>2023年2月16日 下午6:54,Shammon FY  写道:
> >>>
> >>>Hi
> >>>
> >>>Do you mean how to disable `chain` in your custom sink
> >>>connector?  Can you
> >>>give an example of what you want?
> >>>
> >>>On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> >>>
> Hello
> 
> The current Sink operator will be split into two operations,
> Writer and
> Commiter. By default, they will be chained together and executed
> on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
> 
> At present, FlinkSQL can be solved by disabling the chain
> globally, and
> DataStream can partially disable the chain through the
> disableChaining
> method, but both of them need to be set by the user.
> 
> Can the strategy of the Chain be changed in the Custom Sink
> Connector to
> separate Writer and Commiter?
> 
> Thanks && Regards,
> di.wu
> 
> >>>
> >>
> >
>
>


Re:Flink SQL 实现数组元素变换的UDF

2023-02-16 Thread casel.chen
目前应该是不支持,一个替代方案是利用concat函数将数组转成string作为输入,再在你的UDF中拆成数组进行处理。

















在 2023-02-15 16:29:19,"723849736" <723849...@qq.com.INVALID> 写道:
>大家好,
>
>我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
>https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
>目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?
>
>
>因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?
>
>
>class ArrayTransformFunction extends ScalarFunction {
>
>  def eval(a: Array[Long], function: Long = Long): Array[Long] = {
>a.map(e = function(e))
>  }}
>异常信息如下
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
>validation failed. An error occurred in the type inference logic of function 
>'transform'.
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>   at SQLTest$.main(SQLTest.scala:44)
>   at SQLTest.main(SQLTest.scala)
>Caused by: org.apache.flink.table.api.ValidationException: An error occurred 
>in the type inference logic of function 'transform'.
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>   at java.util.Optional.flatMap(Optional.java:241)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>   at 
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>   ... 6 more
>Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
>valid type inference for function class 'udf.ArrayTransformFunction'. Please 
>check for implementation mistakes and/or provide a corresponding hint.
>   at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>   at 
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>   ... 17 more
>Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
>a signature to output mapping.
>   at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>   at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>   ... 20 more
>Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a 
>type inference from method:
>public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
>   at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>   at 
> 

Re: Disable the chain of the Sink operator

2023-02-16 Thread wudi
thanks for your replies.
I think that if Writer and Commiter are not chained together, data consistency 
can be guaranteed, right?
Because when the Commiter does not block the Writer, at the next Checkpoint, if 
the Commit is not completed, the SinkWriter.precommit will not be triggered

In addition, in this scenario (writer blocking caused by slow commit), may the 
performance of disabling Sink's chain be better? Because it can reduce a lot of 
back pressure.

Thanks && Regards,
di.wu


> 2023年2月16日 下午10:05,Chesnay Schepler  写道:
> 
> As far as I know that chain between committer and writer is also required for 
> correctness.
> 
> On 16/02/2023 14:53, weijie guo wrote:
>> Hi wu,
>> 
>> I don't think it is a good choice to directly change the strategy of chain. 
>> Operator chain usually has better performance and resource utilization. If 
>> we directly change the chain policy between them, users can no longer chain 
>> them together, which is not a good starting point.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> 
>> wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
>> 
>>Thank you for your reply.
>> 
>>Currently in the custom Sink Connector, the Flink task will
>>combine Writer and Committer into one thread, and the thread name
>>is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>In this way, when the *Committer.commit()* method is very slow, it
>>will block the*SinkWriter.write()* method to receive upstream data.
>> 
>>The client can use the *env.disableOperatorChaining() *method to
>>split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>*[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>will not block the SinkWriter.write method.
>> 
>>If the chain policy can be disabled in the custom Sink Connector,
>>the client can be prevented from setting and disabling the chain.
>>Or is there a better way to make*Committer.commit()* not block
>>*SinkWriter.write()*?
>> 
>>Looking forward for your reply.
>>Thanks && Regards,
>>di.wu
>> 
>>>2023年2月16日 下午6:54,Shammon FY  写道:
>>> 
>>>Hi
>>> 
>>>Do you mean how to disable `chain` in your custom sink
>>>connector?  Can you
>>>give an example of what you want?
>>> 
>>>On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
>>> 
Hello
 
The current Sink operator will be split into two operations,
Writer and
Commiter. By default, they will be chained together and executed
on the
same thread.
So sometimes when the commiter is very slow, it will block the data
writer, causing back pressure.
 
At present, FlinkSQL can be solved by disabling the chain
globally, and
DataStream can partially disable the chain through the
disableChaining
method, but both of them need to be set by the user.
 
Can the strategy of the Chain be changed in the Custom Sink
Connector to
separate Writer and Commiter?
 
Thanks && Regards,
di.wu
 
>>> 
>> 
> 



Re: Disable the chain of the Sink operator

2023-02-16 Thread wudi
thanks for your replies.
I think that if Writer and Commiter are not chained together, data consistency 
can be guaranteed, right?
Because when the Commiter does not block the Writer, at the next Checkpoint, if 
the Commit is not completed, the SinkWriter.precommit will not be triggered

In addition, in this scenario (writer blocking caused by slow commit), may the 
performance of disabling Sink's chain be better? Because it can reduce a lot of 
back pressure.

Thanks && Regards,
di.wu


> 2023年2月16日 下午10:05,Chesnay Schepler  写道:
> 
> As far as I know that chain between committer and writer is also required for 
> correctness.
> 
> On 16/02/2023 14:53, weijie guo wrote:
>> Hi wu,
>> 
>> I don't think it is a good choice to directly change the strategy of chain. 
>> Operator chain usually has better performance and resource utilization. If 
>> we directly change the chain policy between them, users can no longer chain 
>> them together, which is not a good starting point.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> 
>> wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:
>> 
>>Thank you for your reply.
>> 
>>Currently in the custom Sink Connector, the Flink task will
>>combine Writer and Committer into one thread, and the thread name
>>is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>In this way, when the *Committer.commit()* method is very slow, it
>>will block the*SinkWriter.write()* method to receive upstream data.
>> 
>>The client can use the *env.disableOperatorChaining() *method to
>>split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>*[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>will not block the SinkWriter.write method.
>> 
>>If the chain policy can be disabled in the custom Sink Connector,
>>the client can be prevented from setting and disabling the chain.
>>Or is there a better way to make*Committer.commit()* not block
>>*SinkWriter.write()*?
>> 
>>Looking forward for your reply.
>>Thanks && Regards,
>>di.wu
>> 
>>>2023年2月16日 下午6:54,Shammon FY  写道:
>>> 
>>>Hi
>>> 
>>>Do you mean how to disable `chain` in your custom sink
>>>connector?  Can you
>>>give an example of what you want?
>>> 
>>>On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
>>> 
Hello
 
The current Sink operator will be split into two operations,
Writer and
Commiter. By default, they will be chained together and executed
on the
same thread.
So sometimes when the commiter is very slow, it will block the data
writer, causing back pressure.
 
At present, FlinkSQL can be solved by disabling the chain
globally, and
DataStream can partially disable the chain through the
disableChaining
method, but both of them need to be set by the user.
 
Can the strategy of the Chain be changed in the Custom Sink
Connector to
separate Writer and Commiter?
 
Thanks && Regards,
di.wu
 
>>> 
>> 
> 



Imbalance in Garbage Collection for TaskManagers

2023-02-16 Thread Meghajit Mazumdar
Hello,

We have a Flink session cluster deployment in Kubernetes of around 100
TaskManagers. It processes around 20-30 Kafka Source jobs at the moment.
The jobs run are all using the same jar and only differ in the SQL query
used and other UDFs. We are using the official flink:1.14.3 image.

We observed that one specific task manager has been doing more garbage
collection compared to the others, So much actually, that at a specific
hour of the day, it pauses execution to do GC and thus causes huge consumer
lag to build up. By garbage collection, I mean GC of the Young Generation.
The old generation GC looks fine.

We checked this in our other running Flink clusters and found that actually
in most of them, this behaviour is being seen. In fact, there are always
2-3 TaskManagers which seem to be doing more GC than the others.

Is this a known issue ? Our clusters run long running kafka source to kafka
sink jobs, so wanted to know if this can happen because of  that.

Would appreciate any kind of guidance.
-- 
*Regards,*
*Meghajit*


DataStream Join produces no output and causes program crash

2023-02-16 Thread Reme Ajayi
Hi,
I am trying to join two Kafka Data Streams from and output to another Kafka
topic, however my joined stream does not output any data.  After some time,
my program crashes and runs out of memory, which I think is a result of the
join not working. My code doesn't throw any errors, but the joins don't
produce any output. My join logic is below, please suggest possible
solutions.P.S:
Things I have tried so far:

   1. Increased task slots on the task manager
   2. Added Watermarks to my Kafka sources

 DataStream joinedStream = EntriesStream.join(historyStream)
.where(new KeySelector() 
{

@Override
public String getKey(GenericRecord 
value) throws Exception {
return 
value.get("la_id").toString();

}
}).equalTo(new KeySelector() {

@Override
public String getKey(GenericRecord 
value) throws Exception {
return 
value.get("id").toString();
}

}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinFunction() {


@Override
public Enhanced join(GenericRecord 
first, GenericRecord second)
throws Exception {
return new Enhanced(

Long.parseLong(first.get("c_at").toString()),

first.get("c_type").toString(),

first.get("id").toString(),

Integer.parseInt(first.get("d_cts").toString()),

Integer.parseInt(first.get("c_cts").toString()),

second.get("prov").toString(),

second.get("bb_S_T").toString(),

second.get("p_id").toString(),

second.get("s_ccurr").toString()
);
}
});


Re: Disable the chain of the Sink operator

2023-02-16 Thread Chesnay Schepler
As far as I know that chain between committer and writer is also 
required for correctness.


On 16/02/2023 14:53, weijie guo wrote:

Hi wu,

I don't think it is a good choice to directly change the strategy of 
chain. Operator chain usually has better performance and resource 
utilization. If we directly change the chain policy between them, 
users can no longer chain them together, which is not a good starting 
point.


Best regards,

Weijie



wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:

Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will
combine Writer and Committer into one thread, and the thread name
is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
In this way, when the *Committer.commit()* method is very slow, it
will block the*SinkWriter.write()* method to receive upstream data.

The client can use the *env.disableOperatorChaining() *method to
split the thread into two threads:*[Sink: Writer (1/1)#0] *and
*[Sink: Committer (1/1)#0]*. This Committer. The commit method
will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector,
the client can be prevented from setting and disabling the chain.
Or is there a better way to make*Committer.commit()* not block
*SinkWriter.write()*?

Looking forward for your reply.
Thanks && Regards,
di.wu


2023年2月16日 下午6:54,Shammon FY  写道:

Hi

Do you mean how to disable `chain` in your custom sink
connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:


Hello

The current Sink operator will be split into two operations,
Writer and
Commiter. By default, they will be chained together and executed
on the
same thread.
So sometimes when the commiter is very slow, it will block the data
writer, causing back pressure.

At present, FlinkSQL can be solved by disabling the chain
globally, and
DataStream can partially disable the chain through the
disableChaining
method, but both of them need to be set by the user.

Can the strategy of the Chain be changed in the Custom Sink
Connector to
separate Writer and Commiter?

Thanks && Regards,
di.wu







Re: Disable the chain of the Sink operator

2023-02-16 Thread Chesnay Schepler
As far as I know that chain between committer and writer is also 
required for correctness.


On 16/02/2023 14:53, weijie guo wrote:

Hi wu,

I don't think it is a good choice to directly change the strategy of 
chain. Operator chain usually has better performance and resource 
utilization. If we directly change the chain policy between them, 
users can no longer chain them together, which is not a good starting 
point.


Best regards,

Weijie



wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:

Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will
combine Writer and Committer into one thread, and the thread name
is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
In this way, when the *Committer.commit()* method is very slow, it
will block the*SinkWriter.write()* method to receive upstream data.

The client can use the *env.disableOperatorChaining() *method to
split the thread into two threads:*[Sink: Writer (1/1)#0] *and
*[Sink: Committer (1/1)#0]*. This Committer. The commit method
will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector,
the client can be prevented from setting and disabling the chain.
Or is there a better way to make*Committer.commit()* not block
*SinkWriter.write()*?

Looking forward for your reply.
Thanks && Regards,
di.wu


2023年2月16日 下午6:54,Shammon FY  写道:

Hi

Do you mean how to disable `chain` in your custom sink
connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:


Hello

The current Sink operator will be split into two operations,
Writer and
Commiter. By default, they will be chained together and executed
on the
same thread.
So sometimes when the commiter is very slow, it will block the data
writer, causing back pressure.

At present, FlinkSQL can be solved by disabling the chain
globally, and
DataStream can partially disable the chain through the
disableChaining
method, but both of them need to be set by the user.

Can the strategy of the Chain be changed in the Custom Sink
Connector to
separate Writer and Commiter?

Thanks && Regards,
di.wu







Re: Disable the chain of the Sink operator

2023-02-16 Thread weijie guo
Hi wu,

I don't think it is a good choice to directly change the strategy of chain.
Operator chain usually has better performance and resource utilization. If
we directly change the chain policy between them, users can no longer chain
them together, which is not a good starting point.

Best regards,

Weijie


wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:

> Thank you for your reply.
>
> Currently in the custom Sink Connector, the Flink task will combine Writer
> and Committer into one thread, and the thread name is similar: *[Sink:
> Writer -> Sink: Committer (1/1)#0]*.
> In this way, when the *Committer.commit()* method is very slow, it will
> block the* SinkWriter.write()* method to receive upstream data.
>
> The client can use the *env.disableOperatorChaining() *method to split
> the thread into two threads:* [Sink: Writer (1/1)#0] *and *[Sink:
> Committer (1/1)#0]*. This Committer. The commit method will not block the
> SinkWriter.write method.
>
> If the chain policy can be disabled in the custom Sink Connector, the
> client can be prevented from setting and disabling the chain. Or is there a
> better way to make* Committer.commit()* not block *SinkWriter.write()*?
>
> Looking forward for your reply.
> Thanks && Regards,
> di.wu
>
> 2023年2月16日 下午6:54,Shammon FY  写道:
>
> Hi
>
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
>
> On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
>
> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>
>
>
>


Re: Disable the chain of the Sink operator

2023-02-16 Thread weijie guo
Hi wu,

I don't think it is a good choice to directly change the strategy of chain.
Operator chain usually has better performance and resource utilization. If
we directly change the chain policy between them, users can no longer chain
them together, which is not a good starting point.

Best regards,

Weijie


wudi <676366...@qq.com> 于2023年2月16日周四 19:29写道:

> Thank you for your reply.
>
> Currently in the custom Sink Connector, the Flink task will combine Writer
> and Committer into one thread, and the thread name is similar: *[Sink:
> Writer -> Sink: Committer (1/1)#0]*.
> In this way, when the *Committer.commit()* method is very slow, it will
> block the* SinkWriter.write()* method to receive upstream data.
>
> The client can use the *env.disableOperatorChaining() *method to split
> the thread into two threads:* [Sink: Writer (1/1)#0] *and *[Sink:
> Committer (1/1)#0]*. This Committer. The commit method will not block the
> SinkWriter.write method.
>
> If the chain policy can be disabled in the custom Sink Connector, the
> client can be prevented from setting and disabling the chain. Or is there a
> better way to make* Committer.commit()* not block *SinkWriter.write()*?
>
> Looking forward for your reply.
> Thanks && Regards,
> di.wu
>
> 2023年2月16日 下午6:54,Shammon FY  写道:
>
> Hi
>
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
>
> On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
>
> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>
>
>
>


Re: Disable the chain of the Sink operator

2023-02-16 Thread wudi
Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will combine Writer and 
Committer into one thread, and the thread name is similar: [Sink: Writer -> 
Sink: Committer (1/1)#0].
In this way, when the Committer.commit() method is very slow, it will block the 
SinkWriter.write() method to receive upstream data.

The client can use the env.disableOperatorChaining() method to split the thread 
into two threads: [Sink: Writer (1/1)#0] and [Sink: Committer (1/1)#0]. This 
Committer. The commit method will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector, the client 
can be prevented from setting and disabling the chain. Or is there a better way 
to make Committer.commit() not block SinkWriter.write()?

Looking forward for your reply.
Thanks && Regards,
di.wu

> 2023年2月16日 下午6:54,Shammon FY  写道:
> 
> Hi
> 
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
> 
> On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> 
>> Hello
>> 
>> The current Sink operator will be split into two operations, Writer and
>> Commiter. By default, they will be chained together and executed on the
>> same thread.
>> So sometimes when the commiter is very slow, it will block the data
>> writer, causing back pressure.
>> 
>> At present, FlinkSQL can be solved by disabling the chain globally, and
>> DataStream can partially disable the chain through the disableChaining
>> method, but both of them need to be set by the user.
>> 
>> Can the strategy of the Chain be changed in the Custom Sink Connector to
>> separate Writer and Commiter?
>> 
>> Thanks && Regards,
>> di.wu
>> 
> 



Re: Disable the chain of the Sink operator

2023-02-16 Thread wudi
Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will combine Writer and 
Committer into one thread, and the thread name is similar: [Sink: Writer -> 
Sink: Committer (1/1)#0].
In this way, when the Committer.commit() method is very slow, it will block the 
SinkWriter.write() method to receive upstream data.

The client can use the env.disableOperatorChaining() method to split the thread 
into two threads: [Sink: Writer (1/1)#0] and [Sink: Committer (1/1)#0]. This 
Committer. The commit method will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector, the client 
can be prevented from setting and disabling the chain. Or is there a better way 
to make Committer.commit() not block SinkWriter.write()?

Looking forward for your reply.
Thanks && Regards,
di.wu

> 2023年2月16日 下午6:54,Shammon FY  写道:
> 
> Hi
> 
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
> 
> On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:
> 
>> Hello
>> 
>> The current Sink operator will be split into two operations, Writer and
>> Commiter. By default, they will be chained together and executed on the
>> same thread.
>> So sometimes when the commiter is very slow, it will block the data
>> writer, causing back pressure.
>> 
>> At present, FlinkSQL can be solved by disabling the chain globally, and
>> DataStream can partially disable the chain through the disableChaining
>> method, but both of them need to be set by the user.
>> 
>> Can the strategy of the Chain be changed in the Custom Sink Connector to
>> separate Writer and Commiter?
>> 
>> Thanks && Regards,
>> di.wu
>> 
> 



Re: Disable the chain of the Sink operator

2023-02-16 Thread Shammon FY
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>


Re: Disable the chain of the Sink operator

2023-02-16 Thread Shammon FY
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <676366...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>


Re: Flink SQL 实现数组元素变换的UDF

2023-02-16 Thread Shammon FY
Hi

可以考虑将这个function打入到udf包里,在自定义的udf里直接调用?



On Wed, Feb 15, 2023 at 4:29 PM 723849736 <723849...@qq.com.invalid> wrote:

> 大家好,
>
> 我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
> https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
> 目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?
>
>
> 因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?
>
>
> class ArrayTransformFunction extends ScalarFunction {
>
>   def eval(a: Array[Long], function: Long = Long): Array[Long] = {
> a.map(e = function(e))
>   }}
> 异常信息如下
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type inference logic of
> function 'transform'.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
> at SQLTest$.main(SQLTest.scala:44)
> at SQLTest.main(SQLTest.scala)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'transform'.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.util.Optional.flatMap(Optional.java:241)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
> ... 6 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'udf.ArrayTransformFunction'. Please check for implementation mistakes
> and/or provide a corresponding hint.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
> at
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
> ... 17 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to output mapping.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
> ... 20 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> 

Re: Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-16 Thread Shammon FY
Hi

上面TM心跳出现unreachable,一般是TM退出了,可以看下退出原因
下面Checkpoint超时,可以看下是否出现反压等问题,也可以看checkpoint执行时间,考虑增加checkpoint超时时间

Best,
Shammon


On Thu, Feb 16, 2023 at 10:34 AM lxk  wrote:

> 你好,可以dump下内存分析
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-16 10:05:19,"Fei Han"  写道:
> >@all
> >大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
> >org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with
> id container_e506_1673750933366_49579_01_02(
> hdp-server-010.yigongpin.com:8041) is no longer reachable. at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> ~[?:1.8.0_181] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5]
> >在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint expired before completing. at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
> [flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。
> >请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。
>