RE: Flink operator task opens threads internally

2023-08-03 Thread Kamal Mittal via user
Hello,

I hope you have the context of use case clear now as described in below mail as 
well as here again –

Flink application will be opening a server socket as part of custom source over 
a fixed port and client will be opening client sockets for Flink server socket. 
To read data from each client socket, Flink server socket will open a thread 
after accepting the connection which can be 1000 in nos.

Flink server socket application is running over K8s environment.

Do let me know please that it is ok to open multiple threads for accepting 
client socket connections inside flink application?

Rgds,
Kamal

From: Kamal Mittal via user 
Sent: 03 August 2023 09:27 AM
To: Paul Lam 
Cc: user@flink.apache.org
Subject: RE: Flink operator task opens threads internally

Hello,

We have a client sending TCP traffic towards Flink application and to support 
that there is server socket (Configurable port) opened which is accepting 
socket connections.

So to accept multiple client connections we have used a thread pool 
(configurable size) which will execute in a flink task slot.

Rgds,
KAmal

From: Paul Lam mailto:paullin3...@gmail.com>>
Sent: 03 August 2023 08:59 AM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Flink operator task opens threads internally

Hi Kamal,

It’s okay if you don’t mind the data order.

But it’s not very commonly seen to accept client sockets from Flink jobs, as
the socket server address is dynamic and requires service discovery.

Would you like to share more about the background?

Best,
Paul Lam

2023年8月3日 10:26,Kamal Mittal via user 
mailto:user@flink.apache.org>> 写道:

Hello Community,

Please share views for the below mail.

Rgds,
Kamal

From: Kamal Mittal via user 
mailto:user@flink.apache.org>>
Sent: 02 August 2023 08:19 AM
To: user@flink.apache.org
Subject: Flink operator task opens threads internally

Hello Community,

I have a operator pipeline like as below, is it ok if “source” task opens 
threads by using java thread pool and parallelize the work?

This is needed for accepting multiple client socket connections in “single 
custom source server socket function”.

Single Custom source server socket function – ServerSocket serversocket = new 
ServerSocket();
Now using thread pool accept multiple connections in separate threads = new 
Runnable () -> serversocket.accept();




Rgds,
Kamal



Re: Investigating use of Custom Trigger to smooth CPU usage

2023-08-03 Thread David Anderson
There's already a built-in concept of WindowStagger that provides an
interface for accomplishing this.

It's not as well integrated (or documented) as it might be, but the
basic mechanism exists. To use it, I believe you would do something
like this:

assigner = new TumblingEventTimeWindows(Time.seconds(5), 0,
WindowStagger.RANDOM);

foo.keyBy(...)
  .window(assigner)
  ...

The different stagger strategies are documented in [1].

[1] 
https://nightlies.apache.org/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html

On Wed, Aug 2, 2023 at 7:13 PM xiangyu feng  wrote:
>
> Hi Tucker,
>
> Can you describe more about your running job and how the trigger timer is 
> configured? Also it would be better if you can attach a FlameGraph to show 
> the CPU usage when the timer is triggered.
>
> Best,
> Xiangyu
>
> Tucker Harvey via user  于2023年8月1日周二 05:51写道:
>>
>> Hello Flink community! My team is trying to optimize CPU usage on a running 
>> job, and we're exploring the option of offsetting the trigger time for 
>> smoother CPU patterns. Since adjusting the window will compromise job 
>> correctness, we plan to pursue a custom trigger implementation. We were 
>> curious if the community had any thoughts or insights on this issue.
>>
>>


Re: 作业full gc 很严重

2023-08-03 Thread Shammon FY
Hi,

一般需要确认一下是哪块引起的fullgc,比如metaspace还是堆内存过大导致的。如果是堆内存过大导致的,可以将内存dump下来,用一些分析工具例如mat、visualvm等具体查看一下哪些对象占比比较多,是否存在内存泄漏等原因

Best,
Shammon FY

On Fri, Aug 4, 2023 at 10:00 AM yidan zhao  wrote:

> GC日志看GC原因
>
> 2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
> >
> > 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!
>


Flink restored from an initially-specified checkpoint

2023-08-03 Thread Filip Karnicki
Hi, we recently went live with a job on a shared cluster, which is managed
with Yarn

The job was started using

flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE


Everything worked fine for a few days, but then the job needed to be
restored for whatever reason

2023-08-03 16:34:44,525 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit
code 2.

org.apache.flink.util.FlinkException: Cannot deregister application.
Resource manager service is not available.


It seems that while restoring (yarn 'attempt' 02), Flink used the original
checkpoint we provided as the value of the -s parameter, and not the most
recent checkpoint for that job. This caused a few days worth of data to be
reprocessed.


2023-08-03 16:34:55,259 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Recovering checkpoints from
ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Found 0 checkpoints in
ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Trying to fetch 0 checkpoints from storage.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.util.ZooKeeperUtils [] -
Initialized DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}'
with /checkpoints.

2023-08-03 16:34:55,293 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Running
initialization on master for  (JOBID-WW).

2023-08-03 16:34:55,293 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Successfully ran initialization on master in 0 ms.

2023-08-03 16:34:55,313 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
Built 1 pipelined regions in 0 ms

2023-08-03 16:34:55,347 INFO
org.apache.flink.yarn.YarnResourceManagerDriver  [] - Recovered
0 containers from previous attempts ([]).

2023-08-03 16:34:55,347 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Recovered 0 workers from previous attempt.

2023-08-03 16:34:55,369 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8,
writeBatchSize=2097152}

2023-08-03 16:34:55,369 INFO
org.apache.hadoop.conf.Configuration [] -
resource-types.xml not found

2023-08-03 16:34:55,370 INFO
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
find 'resource-types.xml'.

2023-08-03 16:34:55,371 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Using predefined options: DEFAULT.

2023-08-03 16:34:55,371 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Using application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}.

2023-08-03 16:34:55,371 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
application-defined state backend: EmbeddedRocksDBStateBackend{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=8, writeBatchSize=2097152}

2023-08-03 16:34:55,371 INFO
org.apache.flink.runtime.state.StateBackendLoader[] - State
backend loader loads the state backend as EmbeddedRocksDBStateBackend

2023-08-03 16:34:55,375 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Checkpoint storage is set to 'filesystem': (checkpoints
hdfs:/apps//flink/checkpoints/yy-job)

2023-08-03 16:34:55,377 INFO
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
Enabled external resources: []

2023-08-03 16:34:55,390 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
bound of the thread pool size is 500

2023-08-03 16:34:55,407 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
checkpoint found during restore.

2023-08-03 16:34:55,408 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Starting
job JOBID-WW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE
USED TO LAUNCH WITH]



I see some binary-looking HA files in HDFS that seem to have references to
the correct, latest checkpoint rather than the initial one.

Does anyone have an idea as to what could be causing the recovery to use
the initial checkpoint?

Many thanks
Fil


Re: flink1.17.1版本 flink sql多表关联优化

2023-08-03 Thread 周先明
Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化

guanyq  于2023年8月3日周四 15:59写道:

> 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
>
> select
> date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
> b.vehicle_code,
> a.item_name,
> a.item_value,
> c.item_value as vehicle_score,
> d.current_fault,
> e.history_fault,
> f.late_mileage,
> g.fault_level_event_count,
> h.current_fault_subsystem,
> i.history_fault_subsystem
> from fault_record_subsystem a
> join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id
> join fault_record_vehicle c on a.vehicle_id = c.vehicle_id
> join fault_record_current_count d on a.vehicle_id = d.vehicle_id
> join fault_record_history_count e on a.vehicle_id = e.vehicle_id
> join vehicle_usage_score f on a.vehicle_id = f.vehicle_id
> join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id
> join fault_record_current_count_subsystem h on a.vehicle_id = h.vehicle_id
> and a.item_name = h.item_name
> join fault_record_history_count_subsystem i on a.vehicle_id = i.vehicle_id
> and a.item_name = i.item_name


Re: 作业full gc 很严重

2023-08-03 Thread yidan zhao
GC日志看GC原因

2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
>
> 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!


Re: Will all records grouped using keyBy be allocated to a single subtask?

2023-08-03 Thread xiangyu feng
Hi David,

keyBy() is implemented with hash partitioning. If you use the keyBy
function, the records for a given key will be shuffled to a downstream
operator subtask. See more in [1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby

Regards,
Xiangyu

David Corley  于2023年8月3日周四 23:23写道:

> I have a job using the keyBy function. The job parallelism is 40. My key
> is based on a field in the records that has 2000+ possible values
> My question is for the records for a given key, will they all be sent to
> the one subtask or be distributed evenly amongst the all 40 downstream
> operator sub tasks?
> Put another way , are the partitions created by keyBy all assigned to a
> single downstream subtask?
>


Will all records grouped using keyBy be allocated to a single subtask?

2023-08-03 Thread David Corley
I have a job using the keyBy function. The job parallelism is 40. My key is
based on a field in the records that has 2000+ possible values
My question is for the records for a given key, will they all be sent to
the one subtask or be distributed evenly amongst the all 40 downstream
operator sub tasks?
Put another way , are the partitions created by keyBy all assigned to a
single downstream subtask?


flink1.17.1版本 flink sql多表关联优化

2023-08-03 Thread guanyq
请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式

select
date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
b.vehicle_code,
a.item_name,
a.item_value,
c.item_value as vehicle_score,
d.current_fault,
e.history_fault,
f.late_mileage,
g.fault_level_event_count,
h.current_fault_subsystem,
i.history_fault_subsystem
from fault_record_subsystem a
join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id
join fault_record_vehicle c on a.vehicle_id = c.vehicle_id
join fault_record_current_count d on a.vehicle_id = d.vehicle_id
join fault_record_history_count e on a.vehicle_id = e.vehicle_id
join vehicle_usage_score f on a.vehicle_id = f.vehicle_id
join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id
join fault_record_current_count_subsystem h on a.vehicle_id = h.vehicle_id and 
a.item_name = h.item_name
join fault_record_history_count_subsystem i on a.vehicle_id = i.vehicle_id and 
a.item_name = i.item_name