Operator/Autoscaler/Autotuner tuning behavior question

2024-05-08 Thread Maxim Senin via user
Hello.

I have some questions about memory autotuning in the Operator.

1. Does the autotuner try to upgrade the job with more memory allocated if it 
intercepts OutOfMemoryError? Say I initially provided too little memory for TM 
`resource` - will the job fail and stop on initializing or will the operator 
try to increase memory after it catches OutOfMemoryError?
2. When autoscaler scales a TM from, say, from 1 to 2 instances with higher 
parallelism, does it also double memory and CPU requested in each iteration of 
upscaling, or it would have already measured RAM requirements when it was 
collecting stats and request just what it thinks is needed? I see there’s a 
config option where it increases memory when scaling down so that the job 
doesn’t fail because fewer TMs are doing the job of what more TMs used to be 
doing, so I was wondering if the opposite is true when scaling up.
3. Will autotuner ever increase requested memory beyond what was initially 
requested in TM’s `resource` block in the Deployment CRD? Same for CPU?..
4. Does the operator care about available resources in k8s, or it just makes an 
“optimistic” request and hopes it will be granted? What happens if it requests 
more than available? Keep retrying? Stay in pending state waiting for 
resources? Time out? Exit? Is there a rollback and retry with smaller amount of 
resources requested if a request with larger demands fails? I see there’s an 
option that, when enabled, can refresh information about available resources 
periodically, which should prevent or reduce inadvertent greedy requests. But 
what’s the strategy used by the operator if the request is already too large to 
handle?

Thanks a lot!
/Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: Question around manually setting Flink jobId

2024-03-18 Thread Venkatakrishnan Sowrirajan
Thanks for the response, Asimansu.

I should have been a bit more clearer and shared some additional context on
our internal deployment. Currently, we are running *Flink in YARN
application mode* for *batch* execution purposes (we also run it for stream
execution as well). In the YARN application mode, client only gets the YARN
application ID back when the Flink application is submitted, JobManager (as
part of Application Master) generates the JobId whenever it constructs the
`StreamGraph` and therefore Flink JobId is logged in the JM logs.

Why do we need the Flink Job Id in the Flink Client?

- Currently, Flink Job archives are archived by the Flink JobId therefore
to show the link of the completed application (Flink UI in HistoryServer)
in the orchestrator (eg: Airflow), Flink JobId is required. Airflow
currently only has access to the Flink client logs.

Hope that helps.

Regards
Venkata krishnan


On Thu, Mar 14, 2024 at 6:22 PM Asimansu Bera 
wrote:

> Hello Venkat,
>
> There are few ways to get the JobID from the client side. JobID is alpha
> numeric as 9eec4d17246b5ff965a43082818a3336.
> When you submit the job using flink command line client , Job is returned
> as
>
> Job has been submitted with JobID 9eec4d17246b5ff965a43082818a3336
>
> 1. using below command , you get a list of jobs which are running with
> jobIDs
> $flink job list
>
> 2. using REST API
> http://localhost:8081/jobs/
> 
>
> Hope this helps.
>
> Regards,
> Asimansu
>
>
> On Thu, Mar 14, 2024 at 3:11 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu> wrote:
>
>> Junrui,
>>
>> Thanks for your answer for the above questions.
>>
>> Allison and I work together on Flink. One of the main questions is, is
>> there an easy way to get the Flink "JobID" from the Flink client side?
>> Without the "JobID", users have no way to access Flink HistoryServer
>> other than searching through the list of applications in FlinkHistoryServer
>> UI. Btw, in this case we are running Flink in batch execution mode.
>>
>> Regards
>> Venkat
>>
>>
>> On Wed, Mar 13, 2024 at 7:58 PM Junrui Lee  wrote:
>>
>>> Hi Allison,
>>>
>>> The PIPELINE_FIXED_JOB_ID configuration option is not intended for
>>> public use. IIUC, the only way to manually specify the jobId is submitting
>>> a job through the JAR RUN REST API, where you can provide the jobId in the
>>> request body (
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
>>> 
>>> ).
>>>
>>> Best,
>>> Junrui
>>>
>>> Allison Chang via user  于2024年3月14日周四 08:16写道:
>>>
 Hi,

 I was wondering if there is any way to manually set the jobID for the
 jobGraph. I noticed that there is a configuration for
 PIPELINE_FIXED_JOB_ID, but there doesn't seem to be a way to set it via
 config with the StreamingJobGraphGenerator.java. Would appreciate any
 assistance if anyone has done something similar.

 Best,

 *Allison Chang*





Re: Question around manually setting Flink jobId

2024-03-14 Thread Asimansu Bera
Hello Venkat,

There are few ways to get the JobID from the client side. JobID is alpha
numeric as 9eec4d17246b5ff965a43082818a3336.
When you submit the job using flink command line client , Job is returned
as

Job has been submitted with JobID 9eec4d17246b5ff965a43082818a3336

1. using below command , you get a list of jobs which are running with
jobIDs
$flink job list

2. using REST API
http://localhost:8081/jobs/

Hope this helps.

Regards,
Asimansu


On Thu, Mar 14, 2024 at 3:11 PM Venkatakrishnan Sowrirajan 
wrote:

> Junrui,
>
> Thanks for your answer for the above questions.
>
> Allison and I work together on Flink. One of the main questions is, is
> there an easy way to get the Flink "JobID" from the Flink client side?
> Without the "JobID", users have no way to access Flink HistoryServer
> other than searching through the list of applications in FlinkHistoryServer
> UI. Btw, in this case we are running Flink in batch execution mode.
>
> Regards
> Venkat
>
>
> On Wed, Mar 13, 2024 at 7:58 PM Junrui Lee  wrote:
>
>> Hi Allison,
>>
>> The PIPELINE_FIXED_JOB_ID configuration option is not intended for public
>> use. IIUC, the only way to manually specify the jobId is submitting a job
>> through the JAR RUN REST API, where you can provide the jobId in the
>> request body (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
>> 
>> ).
>>
>> Best,
>> Junrui
>>
>> Allison Chang via user  于2024年3月14日周四 08:16写道:
>>
>>> Hi,
>>>
>>> I was wondering if there is any way to manually set the jobID for the
>>> jobGraph. I noticed that there is a configuration for
>>> PIPELINE_FIXED_JOB_ID, but there doesn't seem to be a way to set it via
>>> config with the StreamingJobGraphGenerator.java. Would appreciate any
>>> assistance if anyone has done something similar.
>>>
>>> Best,
>>>
>>> *Allison Chang*
>>>
>>>
>>>


Re: Question around manually setting Flink jobId

2024-03-14 Thread Venkatakrishnan Sowrirajan
Junrui,

Thanks for your answer for the above questions.

Allison and I work together on Flink. One of the main questions is, is
there an easy way to get the Flink "JobID" from the Flink client side?
Without the "JobID", users have no way to access Flink HistoryServer
other than searching through the list of applications in FlinkHistoryServer
UI. Btw, in this case we are running Flink in batch execution mode.

Regards
Venkat


On Wed, Mar 13, 2024 at 7:58 PM Junrui Lee  wrote:

> Hi Allison,
>
> The PIPELINE_FIXED_JOB_ID configuration option is not intended for public
> use. IIUC, the only way to manually specify the jobId is submitting a job
> through the JAR RUN REST API, where you can provide the jobId in the
> request body (
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
> 
> ).
>
> Best,
> Junrui
>
> Allison Chang via user  于2024年3月14日周四 08:16写道:
>
>> Hi,
>>
>> I was wondering if there is any way to manually set the jobID for the
>> jobGraph. I noticed that there is a configuration for
>> PIPELINE_FIXED_JOB_ID, but there doesn't seem to be a way to set it via
>> config with the StreamingJobGraphGenerator.java. Would appreciate any
>> assistance if anyone has done something similar.
>>
>> Best,
>>
>> *Allison Chang*
>>
>>
>>


Re: Question around manually setting Flink jobId

2024-03-13 Thread Junrui Lee
Hi Allison,

The PIPELINE_FIXED_JOB_ID configuration option is not intended for public
use. IIUC, the only way to manually specify the jobId is submitting a job
through the JAR RUN REST API, where you can provide the jobId in the
request body (
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
).

Best,
Junrui

Allison Chang via user  于2024年3月14日周四 08:16写道:

> Hi,
>
> I was wondering if there is any way to manually set the jobID for the
> jobGraph. I noticed that there is a configuration for
> PIPELINE_FIXED_JOB_ID, but there doesn't seem to be a way to set it via
> config with the StreamingJobGraphGenerator.java. Would appreciate any
> assistance if anyone has done something similar.
>
> Best,
>
> *Allison Chang*
>
>
>


Question around manually setting Flink jobId

2024-03-13 Thread Allison Chang via user
Hi,

I was wondering if there is any way to manually set the jobID for the jobGraph. 
I noticed that there is a configuration for PIPELINE_FIXED_JOB_ID, but there 
doesn't seem to be a way to set it via config with the 
StreamingJobGraphGenerator.java. Would appreciate any assistance if anyone has 
done something similar.

Best,

Allison Chang




Re: Question about time-based operators with RocksDB backend

2024-03-06 Thread xia rui
Hi Gabriele, use (or extend) the window operator provided by Flink is a
better idea. A window operator in Flink manages two types of state:

   - Window state: accumlate data for windows, and provide data to window
   function when a window comes to its end time.
   - Timer state: store the end times of windows, and provide the minimum
   end time to the window operator.


The source codes are mainly
in org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.

   - "InternalAppendingState windowState" refers to the
   window state.
   - "InternalTimerService internalTimerService" refers to the timer
   state.

Best regards
Rui Xia

On Mon, Mar 4, 2024 at 7:39 PM Gabriele Mencagli <
gabriele.menca...@gmail.com> wrote:

> Dear Flink Community,
>
> I am using Flink with the DataStream API and operators implemented using
> RichedFunctions. I know that Flink provides a set of window-based operators
> with time-based semantics and tumbling/sliding windows.
>
> By reading the Flink documentation, I understand that there is the
> possibility to change the memory backend utilized for storing the in-flight
> state of the operators. For example, using RocksDB for this purpose to cope
> with a larger-than-memory state. If I am not wrong, to transparently change
> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
> to access the state. For example, the Keyed State API with different
> abstractions such as ValueState, ListState, etc... as reported here
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/>
> .
>
> My question is related to the utilization of time-based window operators
> with the RocksDB backend. Suppose for example very large temporal windows
> with a huge number of keys in the stream. I am wondering if there is a
> possibility to use the built-in window operators of Flink (e.g., with an
> AggregateFunction or a more generic ProcessWindowFunction as here
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/>)
> transparently with RocksDB support as a state back-end, or if I have to
> develop the window operator in a raw manner using the Keyed State API
> (e.g., ListState, AggregateState) for this purpose by implementing the
> underlying window logic manually in the code of RichedFunction of the
> operator (e.g., a FlatMap).
> Thanks for your support,
>
> --
> Gabriele Mencagli
>
>


Re: Question about time-based operators with RocksDB backend

2024-03-05 Thread Jinzhong Li
Hi Gabriele,

The keyed state APIs (ValueState、ListState、etc) are supported by all
types of state backend (hashmap、rocksdb、etc.). And the built-in window
operators are implemented with these state APIs internally. So you can use
these built-in operators/functions with the RocksDB state backend right out
of the box [1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#setting-default-state-backend

Best,
Jinzhong Li


On Tue, Mar 5, 2024 at 10:59 AM Zakelly Lan  wrote:

> Hi Gabriele,
>
> Quick answer: You can use the built-in window operators which have been
> integrated with state backends including RocksDB.
>
>
> Thanks,
> Zakelly
>
> On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen 
> wrote:
>
>> Hi Gabriele,
>>
>> I'd recommend extending the existing window function whenever possible,
>> as Flink will automatically cover state management for you and no need to
>> be concerned with state backend details. Incremental aggregation for reduce
>> state size is also out of the box if your usage can be satisfied with the
>> reduce/aggregate function pattern, which is important for large windows.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Gabriele Mencagli 
>> *Sent:* Monday, March 4, 2024 19:38
>> *To:* user@flink.apache.org 
>> *Subject:* Question about time-based operators with RocksDB backend
>>
>>
>> Dear Flink Community,
>>
>> I am using Flink with the DataStream API and operators implemented using
>> RichedFunctions. I know that Flink provides a set of window-based operators
>> with time-based semantics and tumbling/sliding windows.
>>
>> By reading the Flink documentation, I understand that there is the
>> possibility to change the memory backend utilized for storing the in-flight
>> state of the operators. For example, using RocksDB for this purpose to cope
>> with a larger-than-memory state. If I am not wrong, to transparently change
>> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
>> to access the state. For example, the Keyed State API with different
>> abstractions such as ValueState, ListState, etc... as reported here
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/>
>> .
>>
>> My question is related to the utilization of time-based window operators
>> with the RocksDB backend. Suppose for example very large temporal windows
>> with a huge number of keys in the stream. I am wondering if there is a
>> possibility to use the built-in window operators of Flink (e.g., with an
>> AggregateFunction or a more generic ProcessWindowFunction as here
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/>)
>> transparently with RocksDB support as a state back-end, or if I have to
>> develop the window operator in a raw manner using the Keyed State API
>> (e.g., ListState, AggregateState) for this purpose by implementing the
>> underlying window logic manually in the code of RichedFunction of the
>> operator (e.g., a FlatMap).
>> Thanks for your support,
>>
>> --
>> Gabriele Mencagli
>>
>>


Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zakelly Lan
Hi Gabriele,

Quick answer: You can use the built-in window operators which have been
integrated with state backends including RocksDB.


Thanks,
Zakelly

On Tue, Mar 5, 2024 at 10:33 AM Zhanghao Chen 
wrote:

> Hi Gabriele,
>
> I'd recommend extending the existing window function whenever possible, as
> Flink will automatically cover state management for you and no need to be
> concerned with state backend details. Incremental aggregation for reduce
> state size is also out of the box if your usage can be satisfied with the
> reduce/aggregate function pattern, which is important for large windows.
>
> Best,
> Zhanghao Chen
> --
> *From:* Gabriele Mencagli 
> *Sent:* Monday, March 4, 2024 19:38
> *To:* user@flink.apache.org 
> *Subject:* Question about time-based operators with RocksDB backend
>
>
> Dear Flink Community,
>
> I am using Flink with the DataStream API and operators implemented using
> RichedFunctions. I know that Flink provides a set of window-based operators
> with time-based semantics and tumbling/sliding windows.
>
> By reading the Flink documentation, I understand that there is the
> possibility to change the memory backend utilized for storing the in-flight
> state of the operators. For example, using RocksDB for this purpose to cope
> with a larger-than-memory state. If I am not wrong, to transparently change
> the backend (e.g., from in-memory to RocksDB) we have to use a proper API
> to access the state. For example, the Keyed State API with different
> abstractions such as ValueState, ListState, etc... as reported here
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/>
> .
>
> My question is related to the utilization of time-based window operators
> with the RocksDB backend. Suppose for example very large temporal windows
> with a huge number of keys in the stream. I am wondering if there is a
> possibility to use the built-in window operators of Flink (e.g., with an
> AggregateFunction or a more generic ProcessWindowFunction as here
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/>)
> transparently with RocksDB support as a state back-end, or if I have to
> develop the window operator in a raw manner using the Keyed State API
> (e.g., ListState, AggregateState) for this purpose by implementing the
> underlying window logic manually in the code of RichedFunction of the
> operator (e.g., a FlatMap).
> Thanks for your support,
>
> --
> Gabriele Mencagli
>
>


Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zhanghao Chen
Hi Gabriele,

I'd recommend extending the existing window function whenever possible, as 
Flink will automatically cover state management for you and no need to be 
concerned with state backend details. Incremental aggregation for reduce state 
size is also out of the box if your usage can be satisfied with the 
reduce/aggregate function pattern, which is important for large windows.

Best,
Zhanghao Chen

From: Gabriele Mencagli 
Sent: Monday, March 4, 2024 19:38
To: user@flink.apache.org 
Subject: Question about time-based operators with RocksDB backend


Dear Flink Community,

I am using Flink with the DataStream API and operators implemented using 
RichedFunctions. I know that Flink provides a set of window-based operators 
with time-based semantics and tumbling/sliding windows.

By reading the Flink documentation, I understand that there is the possibility 
to change the memory backend utilized for storing the in-flight state of the 
operators. For example, using RocksDB for this purpose to cope with a 
larger-than-memory state. If I am not wrong, to transparently change the 
backend (e.g., from in-memory to RocksDB) we have to use a proper API to access 
the state. For example, the Keyed State API with different abstractions such as 
ValueState, ListState, etc... as reported 
here<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/>.

My question is related to the utilization of time-based window operators with 
the RocksDB backend. Suppose for example very large temporal windows with a 
huge number of keys in the stream. I am wondering if there is a possibility to 
use the built-in window operators of Flink (e.g., with an AggregateFunction or 
a more generic ProcessWindowFunction as 
here<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/>)
 transparently with RocksDB support as a state back-end, or if I have to 
develop the window operator in a raw manner using the Keyed State API (e.g., 
ListState, AggregateState) for this purpose by implementing the underlying 
window logic manually in the code of RichedFunction of the operator (e.g., a 
FlatMap).

Thanks for your support,

--
Gabriele Mencagli


Question about time-based operators with RocksDB backend

2024-03-04 Thread Gabriele Mencagli

Dear Flink Community,

I am using Flink with the DataStream API and operators implemented using 
RichedFunctions. I know that Flink provides a set of window-based 
operators with time-based semantics and tumbling/sliding windows.


By reading the Flink documentation, I understand that there is the 
possibility to change the memory backend utilized for storing the 
in-flight state of the operators. For example, using RocksDB for this 
purpose to cope with a larger-than-memory state. If I am not wrong, to 
transparently change the backend (e.g., from in-memory to RocksDB) we 
have to use a proper API to access the state. For example, the Keyed 
State API with different abstractions such as ValueState, 
ListState, etc... as reported here 
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/>.


My question is related to the utilization of time-based window operators 
with the RocksDB backend. Suppose for example very large temporal 
windows with a huge number of keys in the stream. I am wondering if 
there is a possibility to use the built-in window operators of Flink 
(e.g., with an AggregateFunction or a more generic ProcessWindowFunction 
as here 
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/>) 
transparently with RocksDB support as a state back-end, or if I have to 
develop the window operator in a raw manner using the Keyed State API 
(e.g., ListState, AggregateState) for this purpose by implementing the 
underlying window logic manually in the code of RichedFunction of the 
operator (e.g., a FlatMap).


Thanks for your support,

--
Gabriele Mencagli


[Meta question] Sharing blog posts

2024-02-16 Thread Robin Moffatt via user
Hi,
I have a netiquette question - is it ok to share blog posts here that
are specific to Apache Flink and not vendor-focussed?

thanks,

Robin.


Re: keyby mapState use question

2023-12-10 Thread Zakelly Lan
Hi,

This should not happen. I guess the `onTimer` and `processElement` you are
testing are triggered under different keyby keys. Note that the keyed
states are partitioned by the keyby key first, so if querying or setting
the state, you are only manipulating the specific partition which does not
affect other partitions. Please check the partitioned keys (keyby key) are
the same under those two functions using `ctx.getCurrentKey()`. Hope
this helps.


Best,
Zakelly

On Thu, Dec 7, 2023 at 4:48 PM Jake.zhang  wrote:

> Hi all:
>
> KeyBy process function
>
> EventKeyedBroadcastProcessFunction {
>
> private transient mapstate = null;
>
> public void open(Configuration parameters) throws Exception {
>  // initial map state
> }
>
> public void processElement() {
>  // can't get onTimer() function set state key value
>}
>
>public void onTimer() {
>   // set map state  key value first
>}
>
> }
>
> why processElement function can't get onTimer function sets  value ?
>
> thanks.
>
>


keyby mapState use question

2023-12-07 Thread Jake.zhang
Hi all:


KeyBy process function


EventKeyedBroadcastProcessFunction {
 
  private transient mapstate = null;


 public void open(Configuration parameters) throws Exception {
// initial map state
  }


 public void processElement() {
// can't get onTimer() function set state key 
value
 }
 
 public void onTimer() {
   // set map state key value first
 }

}


why processElement function can't get onTimer function sets value ?


thanks.

Re: [Question] How to scale application based on 'reactive' mode

2023-10-23 Thread Dennis Jung
Hello,
Thanks for feedback. I'll start with these.

Regards

2023년 9월 7일 (목) 오후 7:08, Gyula Fóra 님이 작성:

> Jung,
> I don't want to sound unhelpful, but I think the best thing for you to do
> is simply to try these different models in your local env.
> It should be very easy to get started with the Kubernetes Operator on
> Kind/Minikube (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
> )
>
> It's very difficult to answer these questions fully here. Try the
> different modes, observe what happens, read the docs and you will get all
> the answers.
>
> Gyula
>
> On Thu, Sep 7, 2023 at 10:11 AM Dennis Jung  wrote:
>
>> Hello Chen,
>> Thanks for your reply! I have further questions as following...
>>
>> 1. In case of non-reactive mode in Flink 1.18, if the autoscaler adjusts
>> parallelism, what is the difference by using 'reactive' mode?
>> 2. In case if I use Flink 1.15~1.17 without autoscaler, is the difference
>> of using 'reactive' mode is, changing parallelism dynamically by change of
>> TM number (manually, or by custom scaler)?
>>
>> Regards,
>> Jung
>>
>>
>> 2023년 9월 5일 (화) 오후 3:59, Chen Zhanghao 님이 작성:
>>
>>> Hi Dennis,
>>>
>>>
>>>1. In Flink 1.18 + non-reactive mode, autoscaler adjusts the job's
>>>parallelism and the job will request for extra TMs if the current ones
>>>cannot satisfy its need and redundant TMs will be released automatically
>>>later for being idle. In other words, parallelism changes cause TM number
>>>change.
>>>2. The core metrics used is busy time (the amount of time spent on
>>>task processing per 1 second = 1 s - backpressured time - idle time), it 
>>> is
>>>considered to be superior as it counts I/O cost etc into account as well.
>>>Also, the metrics is on a per-task granularity and allows us to identify
>>>bottleneck tasks.
>>>3. Autoscaler feature currently only works for K8s opeartor + native
>>>K8s mode.
>>>
>>>
>>> Best,
>>> Zhanghao Chen
>>> --
>>> *发件人:* Dennis Jung 
>>> *发送时间:* 2023年9月2日 12:58
>>> *收件人:* Gyula Fóra 
>>> *抄送:* user@flink.apache.org 
>>> *主题:* Re: [Question] How to scale application based on 'reactive' mode
>>>
>>> Hello,
>>> Thanks for your notice.
>>>
>>> 1. In "Flink 1.18 + non-reactive", is parallelism being changed by the
>>> number of TM?
>>> 2. In the document(
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/),
>>> it said "we are not using any container memory / CPU utilization metrics
>>> directly here". Which metrics are these using internally?
>>> 3. I'm using standalone k8s(
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
>>> for deployment. Is autoscaler features only available by using the "flink
>>> k8s operator"(sorry I don't understand this clearly yet...)?
>>>
>>> Regards
>>>
>>>
>>> 2023년 9월 1일 (금) 오후 10:20, Gyula Fóra 님이 작성:
>>>
>>> Pretty much, except that with Flink 1.18 autoscaler can scale the job in
>>> place without restarting the JM (even without reactive mode )
>>>
>>> So actually best option is autoscaler with Flink 1.18 native mode (no
>>> reactive)
>>>
>>> Gyula
>>>
>>> On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:
>>>
>>> Thanks for feedback.
>>> Could you check whether I understand correctly?
>>>
>>> *Only using 'reactive' mode:*
>>> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
>>> start'), parallelism will be increased. For example, when job parallelism
>>> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
>>> parallelism will be 2.
>>> But the number of TM is not being controlled automatically.
>>>
>>> *Autoscaler + non-reactive:*
>>> It can flexibilly control the number of TM by several metrics(CPU usage,
>>> throughput, ...), and JobManager will be restarted when scaling. But job
>>> parallelism is the same after the number of TM has been changed.
>>>
>>> *Autoscaler + 'reactive' mode*:
>>> It can control numbers of TM by metric, and increase/decrease job
>>>

pyflink aggfunction in tvf question

2023-09-13 Thread faronzz
hi flink community~
   I came across a problem I didn't understand,I can't use pyflink 
aggfuction function properly in window tvf, The following are available:
java aggfuntion

flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using it the wrong way?Thanks!







from datetime import datetime, timedelta


from pyflink.table import AggregateFunction
from pyflink.common.typeinfo import Types
from pyflink.common import Row

from pyflink.table import Schema, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment


class Sum0(AggregateFunction):

   def get_value(self, accumulator):
   return accumulator[0]

   def create_accumulator(self):
   return Row(0)

   def accumulate(self, accumulator, *args):
   if args[0] is not None:
   accumulator[0] += args[0]

   def retract(self, accumulator, *args):
   if args[0] is not None:
   accumulator[0] -= args[0]

   def merge(self, accumulator, accumulators):
   for acc in accumulators:
   accumulator[0] += acc[0]

   def get_result_type(self):
   return "BIGINT"

   def get_accumulator_type(self):
   return 'ROW'


def test_py_udf_kafka():
   # stream
   env = StreamExecutionEnvironment.get_execution_environment()
   
env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar")
   env.set_parallelism(1)
   table_env = StreamTableEnvironment.create(stream_execution_environment=env)
   ds = env.from_collection(
   collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)),
   (2, 3, "Lee", datetime.now() - timedelta(hours=4)),
   (3, 4, "Jay", datetime.now() - timedelta(hours=4)),
   (5, 6, "Jay", datetime.now() - timedelta(hours=2)),
   (7, 8, "Lee", datetime.now())],
   type_info=Types.ROW([Types.INT(),
   Types.INT(),
   Types.STRING(),
   Types.SQL_TIMESTAMP()]))

   table_schema = Schema.new_builder() \
   .column("f0", "INT") \
   .column("f1", "INT") \
   .column("f2", "STRING") \
   .column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \
   .watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \
   .build()

   ts = table_env.from_data_stream(ds, table_schema) \
   .alias("value", "count", "name", "rowtime")

   print(“schema desc")
   ts.print_schema()

   sql_sink_dll_1 = """CREATE TABLE kafka_test(
   `name` string, `agg_data` bigint)
   with (
   'connector' = 'kafka',
   'topic'='test_java2',
   'properties.bootstrap.servers'='agent3:9092',
   'value.format' = 'json'
   );"""

   table_env.execute_sql(sql_sink_dll_1)
   table_env.create_temporary_view("source", ts)
   table_env.create_temporary_function(
   "sum_udf",
   Sum0())
   sql_query_system = """
   select name,sum(`value`) as agg_data from
   TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
   group by window_start, window_end, name
   """
   sql_query = """
   select name,sum_udf(`value`) as agg_data from
   TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
   group by window_start, window_end, name
   """
   print(table_env.explain_sql(sql_query))
   table_env.sql_query(sql_query).execute().print()
   table_env.sql_query(sql_query).execute_insert("kafka_test").wait()


if __name__ == "__main__":
   test_py_udf_kafka()














Re: [Question] How to scale application based on 'reactive' mode

2023-09-07 Thread Gyula Fóra
Jung,
I don't want to sound unhelpful, but I think the best thing for you to do
is simply to try these different models in your local env.
It should be very easy to get started with the Kubernetes Operator on
Kind/Minikube (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
)

It's very difficult to answer these questions fully here. Try the different
modes, observe what happens, read the docs and you will get all the answers.

Gyula

On Thu, Sep 7, 2023 at 10:11 AM Dennis Jung  wrote:

> Hello Chen,
> Thanks for your reply! I have further questions as following...
>
> 1. In case of non-reactive mode in Flink 1.18, if the autoscaler adjusts
> parallelism, what is the difference by using 'reactive' mode?
> 2. In case if I use Flink 1.15~1.17 without autoscaler, is the difference
> of using 'reactive' mode is, changing parallelism dynamically by change of
> TM number (manually, or by custom scaler)?
>
> Regards,
> Jung
>
>
> 2023년 9월 5일 (화) 오후 3:59, Chen Zhanghao 님이 작성:
>
>> Hi Dennis,
>>
>>
>>1. In Flink 1.18 + non-reactive mode, autoscaler adjusts the job's
>>parallelism and the job will request for extra TMs if the current ones
>>cannot satisfy its need and redundant TMs will be released automatically
>>later for being idle. In other words, parallelism changes cause TM number
>>change.
>>2. The core metrics used is busy time (the amount of time spent on
>>task processing per 1 second = 1 s - backpressured time - idle time), it 
>> is
>>considered to be superior as it counts I/O cost etc into account as well.
>>Also, the metrics is on a per-task granularity and allows us to identify
>>bottleneck tasks.
>>3. Autoscaler feature currently only works for K8s opeartor + native
>>K8s mode.
>>
>>
>> Best,
>> Zhanghao Chen
>> --
>> *发件人:* Dennis Jung 
>> *发送时间:* 2023年9月2日 12:58
>> *收件人:* Gyula Fóra 
>> *抄送:* user@flink.apache.org 
>> *主题:* Re: [Question] How to scale application based on 'reactive' mode
>>
>> Hello,
>> Thanks for your notice.
>>
>> 1. In "Flink 1.18 + non-reactive", is parallelism being changed by the
>> number of TM?
>> 2. In the document(
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/),
>> it said "we are not using any container memory / CPU utilization metrics
>> directly here". Which metrics are these using internally?
>> 3. I'm using standalone k8s(
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
>> for deployment. Is autoscaler features only available by using the "flink
>> k8s operator"(sorry I don't understand this clearly yet...)?
>>
>> Regards
>>
>>
>> 2023년 9월 1일 (금) 오후 10:20, Gyula Fóra 님이 작성:
>>
>> Pretty much, except that with Flink 1.18 autoscaler can scale the job in
>> place without restarting the JM (even without reactive mode )
>>
>> So actually best option is autoscaler with Flink 1.18 native mode (no
>> reactive)
>>
>> Gyula
>>
>> On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:
>>
>> Thanks for feedback.
>> Could you check whether I understand correctly?
>>
>> *Only using 'reactive' mode:*
>> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
>> start'), parallelism will be increased. For example, when job parallelism
>> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
>> parallelism will be 2.
>> But the number of TM is not being controlled automatically.
>>
>> *Autoscaler + non-reactive:*
>> It can flexibilly control the number of TM by several metrics(CPU usage,
>> throughput, ...), and JobManager will be restarted when scaling. But job
>> parallelism is the same after the number of TM has been changed.
>>
>> *Autoscaler + 'reactive' mode*:
>> It can control numbers of TM by metric, and increase/decrease job
>> parallelism by changing TM.
>>
>> Regards,
>> Jung
>>
>> 2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:
>>
>> I would look at reactive scaling as a way to increase / decrease
>> parallelism.
>>
>> It’s not a way to automatically decide when to actually do it as you need
>> to create new TMs .
>>
>> The autoscaler could use reactive mode to change the parallelism but you
>> need the autoscaler itself to decide when new resources should be added
>>

Re: [Question] How to scale application based on 'reactive' mode

2023-09-07 Thread Dennis Jung
Hello Chen,
Thanks for your reply! I have further questions as following...

1. In case of non-reactive mode in Flink 1.18, if the autoscaler adjusts
parallelism, what is the difference by using 'reactive' mode?
2. In case if I use Flink 1.15~1.17 without autoscaler, is the difference
of using 'reactive' mode is, changing parallelism dynamically by change of
TM number (manually, or by custom scaler)?

Regards,
Jung


2023년 9월 5일 (화) 오후 3:59, Chen Zhanghao 님이 작성:

> Hi Dennis,
>
>
>1. In Flink 1.18 + non-reactive mode, autoscaler adjusts the job's
>parallelism and the job will request for extra TMs if the current ones
>cannot satisfy its need and redundant TMs will be released automatically
>later for being idle. In other words, parallelism changes cause TM number
>change.
>2. The core metrics used is busy time (the amount of time spent on
>task processing per 1 second = 1 s - backpressured time - idle time), it is
>considered to be superior as it counts I/O cost etc into account as well.
>Also, the metrics is on a per-task granularity and allows us to identify
>bottleneck tasks.
>3. Autoscaler feature currently only works for K8s opeartor + native
>K8s mode.
>
>
> Best,
> Zhanghao Chen
> --
> *发件人:* Dennis Jung 
> *发送时间:* 2023年9月2日 12:58
> *收件人:* Gyula Fóra 
> *抄送:* user@flink.apache.org 
> *主题:* Re: [Question] How to scale application based on 'reactive' mode
>
> Hello,
> Thanks for your notice.
>
> 1. In "Flink 1.18 + non-reactive", is parallelism being changed by the
> number of TM?
> 2. In the document(
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/),
> it said "we are not using any container memory / CPU utilization metrics
> directly here". Which metrics are these using internally?
> 3. I'm using standalone k8s(
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
> for deployment. Is autoscaler features only available by using the "flink
> k8s operator"(sorry I don't understand this clearly yet...)?
>
> Regards
>
>
> 2023년 9월 1일 (금) 오후 10:20, Gyula Fóra 님이 작성:
>
> Pretty much, except that with Flink 1.18 autoscaler can scale the job in
> place without restarting the JM (even without reactive mode )
>
> So actually best option is autoscaler with Flink 1.18 native mode (no
> reactive)
>
> Gyula
>
> On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:
>
> Thanks for feedback.
> Could you check whether I understand correctly?
>
> *Only using 'reactive' mode:*
> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
> start'), parallelism will be increased. For example, when job parallelism
> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
> parallelism will be 2.
> But the number of TM is not being controlled automatically.
>
> *Autoscaler + non-reactive:*
> It can flexibilly control the number of TM by several metrics(CPU usage,
> throughput, ...), and JobManager will be restarted when scaling. But job
> parallelism is the same after the number of TM has been changed.
>
> *Autoscaler + 'reactive' mode*:
> It can control numbers of TM by metric, and increase/decrease job
> parallelism by changing TM.
>
> Regards,
> Jung
>
> 2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:
>
> I would look at reactive scaling as a way to increase / decrease
> parallelism.
>
> It’s not a way to automatically decide when to actually do it as you need
> to create new TMs .
>
> The autoscaler could use reactive mode to change the parallelism but you
> need the autoscaler itself to decide when new resources should be added
>
> On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:
>
> For now, the thing I've found about 'reactive' mode is that it
> automatically adjusts 'job parallelism' when TaskManager is
> increased/decreased.
>
>
> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>
> Is there some other feature that only 'reactive' mode offers for scaling?
>
> Thanks.
> Regards.
>
>
>
> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>
> Hello,
> Thank you for your response. I have few more questions in following:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>
> *Reactive Mode configures a job so that it always uses all resources
> available in the cluster. Adding a TaskManager will scale up your job,
> removing resources will scale it down. Flink will manage the parallelism of
> the job, always setting it to the highest possible values.*
> => Do

Re: Question regarding asyncIO timeout

2023-09-06 Thread liu ron
Hi, Leon

> Besides that, Do you know if the async timeout is actually a global
timeout? meaning it accounts for the time of each attempt call plus any
interval time in between.

Yes, the timeout is total timeout, you can see [1][2] for more detail.


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
[2]
https://github.com/apache/flink/blob/7d8f9821d2b3ed9876eae4ffe2e3c8b86af2d88a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java#L209

Best,
Ron

Leon Xu  于2023年9月6日周三 12:07写道:

> Hi Ken,
>
> Thanks for the suggestion. Definitely a good call to just wrap the retry
> inside the client code. I'll give it a try.
> Besides that, Do you know if the async timeout is actually a global
> timeout? meaning it accounts for the time of each attempt call plus any
> interval time in between.
> I increase the async timeout and reduce the client timeout and it seems to
> help. But I will continue to monitor.
>
> Leon
>
> On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler 
> wrote:
>
>> Hi Leon,
>>
>> Normally I try to handle retrying in the client being used to call the
>> server, as you have more control/context.
>>
>> If that’s not an option for you, then normally (un)orderedWaitWithRetry()
>> should work - when you say “it doesn’t seem to help much”, are you saying
>> that even with retry you get transient failures that you want to handle
>> better?
>>
>> If so, then you could implement the timeout() method in your
>> AsyncFunction, and complete with a special result that indicates you
>> exceeded the retry count. This would then avoid having the job restart.
>>
>> — Ken
>>
>> PS - note that you can also do something similar inside of the
>> asyncInvoke() method of your AsyncFunction, e.g:
>>
>> @Override
>>
>> *public* *void* asyncInvoke(String request,
>> ResultFuture resultFuture) *throws* Exception {
>>
>>
>>
>> *final* ServerResult timeoutResult = makeErrorResult(blah,
>> "Timeout");
>>
>>
>>
>> // Use your own executor, so that you're not relying on the size
>> of the common fork pool.
>>
>> CompletableFuture.*supplyAsync*(*new*
>> Supplier() {
>>
>>
>> @Override
>>
>> *public* ServerResult get() {
>>
>> *try* {
>>
>> *return* client.request(request);
>>
>> } *catch* (Exception e) {
>>
>> *LOGGER*.debug("Exception requesting " + request, e);
>>
>> *return* makeErrorResult(blah, e.getMessage());
>>
>> }
>>
>> }
>>
>> }, executor)
>>
>> .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*,
>> TimeUnit.*SECONDS*)
>>
>> .thenAccept((ServerResult result) -> {
>>
>> ServerRequestResult requestResult = *new*
>>  ServerRequestResult();
>>
>> requestResult.setBlah();
>>
>> resultFuture.complete(Collections.*singleton*(fetchResult));
>>
>> });
>>
>> }
>>
>>
>> On Sep 5, 2023, at 12:16 PM, Leon Xu  wrote:
>>
>> Hi Flink users,
>>
>> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
>> And from time to time we are experiencing Async function timeout issues,
>> here's the exception.
>> ```
>> java.lang.Exception: Could not complete the stream element: Record @
>> 169393916 : [B@cadc5b3.
>> Caused by: java.util.concurrent.TimeoutException: Async function call
>> has timed out.
>> ```
>> Every timeout will cause the job to restart, which seems to be very
>> expensive.
>>
>> On the server side it looks like these timeouts are transient and we were
>> expecting a retry will fix the issue.
>> We tried using the asyncIO retry strategy but it doesn't seem to help
>> much.
>> `AsyncDataStream.orderedWaitWithRetry`
>>
>> Do you have any suggestions on how to better reduce these timeout errors?
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>


Re: Question regarding asyncIO timeout

2023-09-05 Thread Leon Xu
Hi Ken,

Thanks for the suggestion. Definitely a good call to just wrap the retry
inside the client code. I'll give it a try.
Besides that, Do you know if the async timeout is actually a global
timeout? meaning it accounts for the time of each attempt call plus any
interval time in between.
I increase the async timeout and reduce the client timeout and it seems to
help. But I will continue to monitor.

Leon

On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler 
wrote:

> Hi Leon,
>
> Normally I try to handle retrying in the client being used to call the
> server, as you have more control/context.
>
> If that’s not an option for you, then normally (un)orderedWaitWithRetry()
> should work - when you say “it doesn’t seem to help much”, are you saying
> that even with retry you get transient failures that you want to handle
> better?
>
> If so, then you could implement the timeout() method in your
> AsyncFunction, and complete with a special result that indicates you
> exceeded the retry count. This would then avoid having the job restart.
>
> — Ken
>
> PS - note that you can also do something similar inside of the
> asyncInvoke() method of your AsyncFunction, e.g:
>
> @Override
>
> *public* *void* asyncInvoke(String request,
> ResultFuture resultFuture) *throws* Exception {
>
>
>
> *final* ServerResult timeoutResult = makeErrorResult(blah,
> "Timeout");
>
>
>
> // Use your own executor, so that you're not relying on the size
> of the common fork pool.
>
> CompletableFuture.*supplyAsync*(*new*
> Supplier() {
>
>
> @Override
>
> *public* ServerResult get() {
>
> *try* {
>
> *return* client.request(request);
>
> } *catch* (Exception e) {
>
> *LOGGER*.debug("Exception requesting " + request, e);
>
> *return* makeErrorResult(blah, e.getMessage());
>
> }
>
> }
>
> }, executor)
>
> .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*,
> TimeUnit.*SECONDS*)
>
> .thenAccept((ServerResult result) -> {
>
> ServerRequestResult requestResult = *new*
>  ServerRequestResult();
>
> requestResult.setBlah();
>
> resultFuture.complete(Collections.*singleton*(fetchResult));
>
> });
>
> }
>
>
> On Sep 5, 2023, at 12:16 PM, Leon Xu  wrote:
>
> Hi Flink users,
>
> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
> And from time to time we are experiencing Async function timeout issues,
> here's the exception.
> ```
> java.lang.Exception: Could not complete the stream element: Record @
> 169393916 : [B@cadc5b3.
> Caused by: java.util.concurrent.TimeoutException: Async function call has
> timed out.
> ```
> Every timeout will cause the job to restart, which seems to be very
> expensive.
>
> On the server side it looks like these timeouts are transient and we were
> expecting a retry will fix the issue.
> We tried using the asyncIO retry strategy but it doesn't seem to help much.
> `AsyncDataStream.orderedWaitWithRetry`
>
> Do you have any suggestions on how to better reduce these timeout errors?
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: Question regarding asyncIO timeout

2023-09-05 Thread Ken Krugler
Hi Leon,

Normally I try to handle retrying in the client being used to call the server, 
as you have more control/context.

If that’s not an option for you, then normally (un)orderedWaitWithRetry() 
should work - when you say “it doesn’t seem to help much”, are you saying that 
even with retry you get transient failures that you want to handle better?

If so, then you could implement the timeout() method in your AsyncFunction, and 
complete with a special result that indicates you exceeded the retry count. 
This would then avoid having the job restart.

— Ken

PS - note that you can also do something similar inside of the asyncInvoke() 
method of your AsyncFunction, e.g:

@Override
public void asyncInvoke(String request, ResultFuture 
resultFuture) throws Exception {

final ServerResult timeoutResult = makeErrorResult(blah, "Timeout");

// Use your own executor, so that you're not relying on the size of the 
common fork pool.
CompletableFuture.supplyAsync(new 
Supplier() {

@Override
public ServerResult get() {
try {
return client.request(request);
} catch (Exception e) {
LOGGER.debug("Exception requesting " + request, e);
return makeErrorResult(blah, e.getMessage());
}
}
}, executor)
.completeOnTimeout(timeoutResult, REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
.thenAccept((ServerResult result) -> {
ServerRequestResult requestResult = new ServerRequestResult();
requestResult.setBlah();
resultFuture.complete(Collections.singleton(fetchResult));
});
}


> On Sep 5, 2023, at 12:16 PM, Leon Xu  wrote:
> 
> Hi Flink users,
> 
> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
> And from time to time we are experiencing Async function timeout issues, 
> here's the exception.
> ```
> java.lang.Exception: Could not complete the stream element: Record @ 
> 169393916 : [B@cadc5b3.
> Caused by: java.util.concurrent.TimeoutException: Async function call has 
> timed out.
> ```
> Every timeout will cause the job to restart, which seems to be very expensive.
> 
> On the server side it looks like these timeouts are transient and we were 
> expecting a retry will fix the issue.
> We tried using the asyncIO retry strategy but it doesn't seem to help much.
> `AsyncDataStream.orderedWaitWithRetry`
> 
> Do you have any suggestions on how to better reduce these timeout errors?
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Question regarding asyncIO timeout

2023-09-05 Thread Leon Xu
Hi Flink users,

We are using Flink AsyncIO to call a grpc-based service in our Flink job.
And from time to time we are experiencing Async function timeout issues,
here's the exception.
```
java.lang.Exception: Could not complete the stream element: Record @
169393916 : [B@cadc5b3.
Caused by: java.util.concurrent.TimeoutException: Async function call has
timed out.
```
Every timeout will cause the job to restart, which seems to be very
expensive.

On the server side it looks like these timeouts are transient and we were
expecting a retry will fix the issue.
We tried using the asyncIO retry strategy but it doesn't seem to help much.
`AsyncDataStream.orderedWaitWithRetry`

Do you have any suggestions on how to better reduce these timeout errors?


回复: [Question] How to scale application based on 'reactive' mode

2023-09-05 Thread Chen Zhanghao
Hi Dennis,


  1.  In Flink 1.18 + non-reactive mode, autoscaler adjusts the job's 
parallelism and the job will request for extra TMs if the current ones cannot 
satisfy its need and redundant TMs will be released automatically later for 
being idle. In other words, parallelism changes cause TM number change.
  2.  The core metrics used is busy time (the amount of time spent on task 
processing per 1 second = 1 s - backpressured time - idle time), it is 
considered to be superior as it counts I/O cost etc into account as well. Also, 
the metrics is on a per-task granularity and allows us to identify bottleneck 
tasks.
  3.  Autoscaler feature currently only works for K8s opeartor + native K8s 
mode.

Best,
Zhanghao Chen

发件人: Dennis Jung 
发送时间: 2023年9月2日 12:58
收件人: Gyula Fóra 
抄送: user@flink.apache.org 
主题: Re: [Question] How to scale application based on 'reactive' mode

Hello,
Thanks for your notice.

1. In "Flink 1.18 + non-reactive", is parallelism being changed by the number 
of TM?
2. In the 
document(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/),
 it said "we are not using any container memory / CPU utilization metrics 
directly here". Which metrics are these using internally?
3. I'm using standalone 
k8s(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
 for deployment. Is autoscaler features only available by using the "flink k8s 
operator"(sorry I don't understand this clearly yet...)?

Regards


2023년 9월 1일 (금) 오후 10:20, Gyula Fóra 
mailto:gyula.f...@gmail.com>>님이 작성:
Pretty much, except that with Flink 1.18 autoscaler can scale the job in place 
without restarting the JM (even without reactive mode )

So actually best option is autoscaler with Flink 1.18 native mode (no reactive)

Gyula

On Fri, 1 Sep 2023 at 13:54, Dennis Jung 
mailto:inylov...@gmail.com>> wrote:
Thanks for feedback.
Could you check whether I understand correctly?

Only using 'reactive' mode:
By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh 
start'), parallelism will be increased. For example, when job parallelism is 1 
and TM is 1, and if adding 1 new TM, JobManager will be restarted and 
parallelism will be 2.
But the number of TM is not being controlled automatically.

Autoscaler + non-reactive:
It can flexibilly control the number of TM by several metrics(CPU usage, 
throughput, ...), and JobManager will be restarted when scaling. But job 
parallelism is the same after the number of TM has been changed.

Autoscaler + 'reactive' mode:
It can control numbers of TM by metric, and increase/decrease job parallelism 
by changing TM.

Regards,
Jung

2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 
mailto:gyula.f...@gmail.com>>님이 작성:
I would look at reactive scaling as a way to increase / decrease parallelism.

It’s not a way to automatically decide when to actually do it as you need to 
create new TMs .

The autoscaler could use reactive mode to change the parallelism but you need 
the autoscaler itself to decide when new resources should be added

On Fri, 1 Sep 2023 at 13:09, Dennis Jung 
mailto:inylov...@gmail.com>> wrote:
For now, the thing I've found about 'reactive' mode is that it automatically 
adjusts 'job parallelism' when TaskManager is increased/decreased.

https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode

Is there some other feature that only 'reactive' mode offers for scaling?

Thanks.
Regards.



2023년 9월 1일 (금) 오후 4:56, Dennis Jung 
mailto:inylov...@gmail.com>>님이 작성:
Hello,
Thank you for your response. I have few more questions in following: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/

Reactive Mode configures a job so that it always uses all resources available 
in the cluster. Adding a TaskManager will scale up your job, removing resources 
will scale it down. Flink will manage the parallelism of the job, always 
setting it to the highest possible values.
=> Does this mean when I add/remove TaskManager in 'non-reactive' mode, 
resource(CPU/Memory/Etc.) of the cluster is not being changed?

Reactive Mode restarts a job on a rescaling event, restoring it from the latest 
completed checkpoint. This means that there is no overhead of creating a 
savepoint (which is needed for manually rescaling a job). Also, the amount of 
data that is reprocessed after rescaling depends on the checkpointing interval, 
and the restore time depends on the state size.
=> As I know 'rescaling' also works in non-reactive mode, with restoring 
checkpoint. What is the difference of using 'reactive' here?

The Reactive Mode allows Flink users to implement a powerful autoscaling 
mechanism, by having an external service monitor certain metrics, such as 
consumer lag, aggregate CPU utilization, throughput or latency. As soon as

Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
aggregate CPU utilization, throughput or
>>>>> latency. As soon as these metrics are above or below a certain threshold,
>>>>> additional TaskManagers can be added or removed from the Flink cluster.*
>>>>> => Why is this only possible in 'reactive' mode? Seems this is more
>>>>> related to 'autoscaler'. Are there some specific features/API which can
>>>>> control TaskManager/Parallelism only in 'reactive' mode?
>>>>>
>>>>> Thank you.
>>>>>
>>>>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>>>>
>>>>>> The reactive mode reacts to available resources. The autoscaler
>>>>>> reacts to changing load and processing capacity and adjusts resources.
>>>>>>
>>>>>> Completely different concepts and applicability.
>>>>>> Most people want the autoscaler , but this is a recent feature and is
>>>>>> specific to the k8s operator at the moment.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> Thanks for your notice.
>>>>>>>
>>>>>>> Than what is the purpose of using 'reactive', if this doesn't do
>>>>>>> anything itself?
>>>>>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Jung
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> I think what you need is probably not the reactive mode but a
>>>>>>>> proper autoscaler. The reactive mode as you say doesn't do anything in
>>>>>>>> itself, you need to build a lot of logic around it.
>>>>>>>>
>>>>>>>> Check this instead:
>>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>>>>>>
>>>>>>>> The Kubernetes Operator has a built in autoscaler that can scale
>>>>>>>> jobs based on kafka data rate / processing throughput. It also doesn't 
>>>>>>>> rely
>>>>>>>> on the reactive mode.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> Sorry for frequent questions. This is a question about 'reactive'
>>>>>>>>> mode.
>>>>>>>>>
>>>>>>>>> 1. As far as I understand, though I've setup `scheduler-mode:
>>>>>>>>> reactive`, it will not change parallelism automatically by itself, by 
>>>>>>>>> CPU
>>>>>>>>> usage or Kafka consumer rate. It needs additional resource monitor 
>>>>>>>>> features
>>>>>>>>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>>>>>>>>> 2. Is it possible to create a custom resource monitor provider
>>>>>>>>> application? For example, if I want to increase/decrease parallelism 
>>>>>>>>> by
>>>>>>>>> Kafka consumer rate, do I need to send specific API from outside, to 
>>>>>>>>> order
>>>>>>>>> rescaling?
>>>>>>>>> 3. If 2 is correct, what is the difference when using 'reactive'
>>>>>>>>> mode? Because as far as I think, calling a specific API will rescale 
>>>>>>>>> either
>>>>>>>>> using 'reactive' mode or not...(or is the API just working based on 
>>>>>>>>> this
>>>>>>>>> mode)?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>>
>>>>>>>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
i, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>>>>
>>>>>> Hello,
>>>>>> Thanks for your notice.
>>>>>>
>>>>>> Than what is the purpose of using 'reactive', if this doesn't do
>>>>>> anything itself?
>>>>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>>>>
>>>>>> Regards,
>>>>>> Jung
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> I think what you need is probably not the reactive mode but a proper
>>>>>>> autoscaler. The reactive mode as you say doesn't do anything in itself, 
>>>>>>> you
>>>>>>> need to build a lot of logic around it.
>>>>>>>
>>>>>>> Check this instead:
>>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>>>>>
>>>>>>> The Kubernetes Operator has a built in autoscaler that can scale
>>>>>>> jobs based on kafka data rate / processing throughput. It also doesn't 
>>>>>>> rely
>>>>>>> on the reactive mode.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> Sorry for frequent questions. This is a question about 'reactive'
>>>>>>>> mode.
>>>>>>>>
>>>>>>>> 1. As far as I understand, though I've setup `scheduler-mode:
>>>>>>>> reactive`, it will not change parallelism automatically by itself, by 
>>>>>>>> CPU
>>>>>>>> usage or Kafka consumer rate. It needs additional resource monitor 
>>>>>>>> features
>>>>>>>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>>>>>>>> 2. Is it possible to create a custom resource monitor provider
>>>>>>>> application? For example, if I want to increase/decrease parallelism by
>>>>>>>> Kafka consumer rate, do I need to send specific API from outside, to 
>>>>>>>> order
>>>>>>>> rescaling?
>>>>>>>> 3. If 2 is correct, what is the difference when using 'reactive'
>>>>>>>> mode? Because as far as I think, calling a specific API will rescale 
>>>>>>>> either
>>>>>>>> using 'reactive' mode or not...(or is the API just working based on 
>>>>>>>> this
>>>>>>>> mode)?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
Thanks for feedback.
Could you check whether I understand correctly?

*Only using 'reactive' mode:*
By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
start'), parallelism will be increased. For example, when job parallelism
is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
parallelism will be 2.
But the number of TM is not being controlled automatically.

*Autoscaler + non-reactive:*
It can flexibilly control the number of TM by several metrics(CPU usage,
throughput, ...), and JobManager will be restarted when scaling. But job
parallelism is the same after the number of TM has been changed.

*Autoscaler + 'reactive' mode*:
It can control numbers of TM by metric, and increase/decrease job
parallelism by changing TM.

Regards,
Jung

2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:

> I would look at reactive scaling as a way to increase / decrease
> parallelism.
>
> It’s not a way to automatically decide when to actually do it as you need
> to create new TMs .
>
> The autoscaler could use reactive mode to change the parallelism but you
> need the autoscaler itself to decide when new resources should be added
>
> On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:
>
>> For now, the thing I've found about 'reactive' mode is that it
>> automatically adjusts 'job parallelism' when TaskManager is
>> increased/decreased.
>>
>>
>> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>>
>> Is there some other feature that only 'reactive' mode offers for scaling?
>>
>> Thanks.
>> Regards.
>>
>>
>>
>> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>>
>>> Hello,
>>> Thank you for your response. I have few more questions in following:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>>>
>>> *Reactive Mode configures a job so that it always uses all resources
>>> available in the cluster. Adding a TaskManager will scale up your job,
>>> removing resources will scale it down. Flink will manage the parallelism of
>>> the job, always setting it to the highest possible values.*
>>> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
>>> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>>>
>>> *Reactive Mode restarts a job on a rescaling event, restoring it from
>>> the latest completed checkpoint. This means that there is no overhead of
>>> creating a savepoint (which is needed for manually rescaling a job). Also,
>>> the amount of data that is reprocessed after rescaling depends on the
>>> checkpointing interval, and the restore time depends on the state size.*
>>> => As I know 'rescaling' also works in non-reactive mode, with restoring
>>> checkpoint. What is the difference of using 'reactive' here?
>>>
>>> *The Reactive Mode allows Flink users to implement a powerful
>>> autoscaling mechanism, by having an external service monitor certain
>>> metrics, such as consumer lag, aggregate CPU utilization, throughput or
>>> latency. As soon as these metrics are above or below a certain threshold,
>>> additional TaskManagers can be added or removed from the Flink cluster.*
>>> => Why is this only possible in 'reactive' mode? Seems this is more
>>> related to 'autoscaler'. Are there some specific features/API which can
>>> control TaskManager/Parallelism only in 'reactive' mode?
>>>
>>> Thank you.
>>>
>>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>>
>>>> The reactive mode reacts to available resources. The autoscaler reacts
>>>> to changing load and processing capacity and adjusts resources.
>>>>
>>>> Completely different concepts and applicability.
>>>> Most people want the autoscaler , but this is a recent feature and is
>>>> specific to the k8s operator at the moment.
>>>>
>>>> Gyula
>>>>
>>>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>>>
>>>>> Hello,
>>>>> Thanks for your notice.
>>>>>
>>>>> Than what is the purpose of using 'reactive', if this doesn't do
>>>>> anything itself?
>>>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>>>
>>>>> Regards,
>>>>> Jung
>>>>>
>>>>>
>>>>>
>>>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>>>
>>>>>> Hi!
>>>>>>
>>>&g

Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
I would look at reactive scaling as a way to increase / decrease
parallelism.

It’s not a way to automatically decide when to actually do it as you need
to create new TMs .

The autoscaler could use reactive mode to change the parallelism but you
need the autoscaler itself to decide when new resources should be added

On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:

> For now, the thing I've found about 'reactive' mode is that it
> automatically adjusts 'job parallelism' when TaskManager is
> increased/decreased.
>
>
> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>
> Is there some other feature that only 'reactive' mode offers for scaling?
>
> Thanks.
> Regards.
>
>
>
> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>
>> Hello,
>> Thank you for your response. I have few more questions in following:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>>
>> *Reactive Mode configures a job so that it always uses all resources
>> available in the cluster. Adding a TaskManager will scale up your job,
>> removing resources will scale it down. Flink will manage the parallelism of
>> the job, always setting it to the highest possible values.*
>> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
>> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>>
>> *Reactive Mode restarts a job on a rescaling event, restoring it from the
>> latest completed checkpoint. This means that there is no overhead of
>> creating a savepoint (which is needed for manually rescaling a job). Also,
>> the amount of data that is reprocessed after rescaling depends on the
>> checkpointing interval, and the restore time depends on the state size.*
>> => As I know 'rescaling' also works in non-reactive mode, with restoring
>> checkpoint. What is the difference of using 'reactive' here?
>>
>> *The Reactive Mode allows Flink users to implement a powerful autoscaling
>> mechanism, by having an external service monitor certain metrics, such as
>> consumer lag, aggregate CPU utilization, throughput or latency. As soon as
>> these metrics are above or below a certain threshold, additional
>> TaskManagers can be added or removed from the Flink cluster.*
>> => Why is this only possible in 'reactive' mode? Seems this is more
>> related to 'autoscaler'. Are there some specific features/API which can
>> control TaskManager/Parallelism only in 'reactive' mode?
>>
>> Thank you.
>>
>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>
>>> The reactive mode reacts to available resources. The autoscaler reacts
>>> to changing load and processing capacity and adjusts resources.
>>>
>>> Completely different concepts and applicability.
>>> Most people want the autoscaler , but this is a recent feature and is
>>> specific to the k8s operator at the moment.
>>>
>>> Gyula
>>>
>>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>>
>>>> Hello,
>>>> Thanks for your notice.
>>>>
>>>> Than what is the purpose of using 'reactive', if this doesn't do
>>>> anything itself?
>>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>>
>>>> Regards,
>>>> Jung
>>>>
>>>>
>>>>
>>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>>
>>>>> Hi!
>>>>>
>>>>> I think what you need is probably not the reactive mode but a proper
>>>>> autoscaler. The reactive mode as you say doesn't do anything in itself, 
>>>>> you
>>>>> need to build a lot of logic around it.
>>>>>
>>>>> Check this instead:
>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>>>
>>>>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>>>>> based on kafka data rate / processing throughput. It also doesn't rely on
>>>>> the reactive mode.
>>>>>
>>>>> Cheers,
>>>>> Gyula
>>>>>
>>>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Sorry for frequent questions. This is a question about 'reactive'
>>>>>> mode.
>>>>>>
>>>>>> 1. As far as I understand, though I've setup `scheduler-mode:
>>>>>> reactive`, it will not change parallelism automatically by itself, by CPU
>>>>>> usage or Kafka consumer rate. It needs additional resource monitor 
>>>>>> features
>>>>>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>>>>>> 2. Is it possible to create a custom resource monitor provider
>>>>>> application? For example, if I want to increase/decrease parallelism by
>>>>>> Kafka consumer rate, do I need to send specific API from outside, to 
>>>>>> order
>>>>>> rescaling?
>>>>>> 3. If 2 is correct, what is the difference when using 'reactive'
>>>>>> mode? Because as far as I think, calling a specific API will rescale 
>>>>>> either
>>>>>> using 'reactive' mode or not...(or is the API just working based on this
>>>>>> mode)?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
For now, the thing I've found about 'reactive' mode is that it
automatically adjusts 'job parallelism' when TaskManager is
increased/decreased.

https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode

Is there some other feature that only 'reactive' mode offers for scaling?

Thanks.
Regards.



2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:

> Hello,
> Thank you for your response. I have few more questions in following:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>
> *Reactive Mode configures a job so that it always uses all resources
> available in the cluster. Adding a TaskManager will scale up your job,
> removing resources will scale it down. Flink will manage the parallelism of
> the job, always setting it to the highest possible values.*
> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>
> *Reactive Mode restarts a job on a rescaling event, restoring it from the
> latest completed checkpoint. This means that there is no overhead of
> creating a savepoint (which is needed for manually rescaling a job). Also,
> the amount of data that is reprocessed after rescaling depends on the
> checkpointing interval, and the restore time depends on the state size.*
> => As I know 'rescaling' also works in non-reactive mode, with restoring
> checkpoint. What is the difference of using 'reactive' here?
>
> *The Reactive Mode allows Flink users to implement a powerful autoscaling
> mechanism, by having an external service monitor certain metrics, such as
> consumer lag, aggregate CPU utilization, throughput or latency. As soon as
> these metrics are above or below a certain threshold, additional
> TaskManagers can be added or removed from the Flink cluster.*
> => Why is this only possible in 'reactive' mode? Seems this is more
> related to 'autoscaler'. Are there some specific features/API which can
> control TaskManager/Parallelism only in 'reactive' mode?
>
> Thank you.
>
> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>
>> The reactive mode reacts to available resources. The autoscaler reacts to
>> changing load and processing capacity and adjusts resources.
>>
>> Completely different concepts and applicability.
>> Most people want the autoscaler , but this is a recent feature and is
>> specific to the k8s operator at the moment.
>>
>> Gyula
>>
>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>
>>> Hello,
>>> Thanks for your notice.
>>>
>>> Than what is the purpose of using 'reactive', if this doesn't do
>>> anything itself?
>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>
>>> Regards,
>>> Jung
>>>
>>>
>>>
>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>
>>>> Hi!
>>>>
>>>> I think what you need is probably not the reactive mode but a proper
>>>> autoscaler. The reactive mode as you say doesn't do anything in itself, you
>>>> need to build a lot of logic around it.
>>>>
>>>> Check this instead:
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>>
>>>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>>>> based on kafka data rate / processing throughput. It also doesn't rely on
>>>> the reactive mode.
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>>>>
>>>>> 1. As far as I understand, though I've setup `scheduler-mode:
>>>>> reactive`, it will not change parallelism automatically by itself, by CPU
>>>>> usage or Kafka consumer rate. It needs additional resource monitor 
>>>>> features
>>>>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>>>>> 2. Is it possible to create a custom resource monitor provider
>>>>> application? For example, if I want to increase/decrease parallelism by
>>>>> Kafka consumer rate, do I need to send specific API from outside, to order
>>>>> rescaling?
>>>>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>>>>> Because as far as I think, calling a specific API will rescale either 
>>>>> using
>>>>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Regards
>>>>>
>>>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
Hello,
Thank you for your response. I have few more questions in following:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/

*Reactive Mode configures a job so that it always uses all resources
available in the cluster. Adding a TaskManager will scale up your job,
removing resources will scale it down. Flink will manage the parallelism of
the job, always setting it to the highest possible values.*
=> Does this mean when I add/remove TaskManager in 'non-reactive' mode,
resource(CPU/Memory/Etc.) of the cluster is not being changed?

*Reactive Mode restarts a job on a rescaling event, restoring it from the
latest completed checkpoint. This means that there is no overhead of
creating a savepoint (which is needed for manually rescaling a job). Also,
the amount of data that is reprocessed after rescaling depends on the
checkpointing interval, and the restore time depends on the state size.*
=> As I know 'rescaling' also works in non-reactive mode, with restoring
checkpoint. What is the difference of using 'reactive' here?

*The Reactive Mode allows Flink users to implement a powerful autoscaling
mechanism, by having an external service monitor certain metrics, such as
consumer lag, aggregate CPU utilization, throughput or latency. As soon as
these metrics are above or below a certain threshold, additional
TaskManagers can be added or removed from the Flink cluster.*
=> Why is this only possible in 'reactive' mode? Seems this is more related
to 'autoscaler'. Are there some specific features/API which can control
TaskManager/Parallelism only in 'reactive' mode?

Thank you.

2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:

> The reactive mode reacts to available resources. The autoscaler reacts to
> changing load and processing capacity and adjusts resources.
>
> Completely different concepts and applicability.
> Most people want the autoscaler , but this is a recent feature and is
> specific to the k8s operator at the moment.
>
> Gyula
>
> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>
>> Hello,
>> Thanks for your notice.
>>
>> Than what is the purpose of using 'reactive', if this doesn't do anything
>> itself?
>> What is the difference if I use auto-scaler without 'reactive' mode?
>>
>> Regards,
>> Jung
>>
>>
>>
>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>
>>> Hi!
>>>
>>> I think what you need is probably not the reactive mode but a proper
>>> autoscaler. The reactive mode as you say doesn't do anything in itself, you
>>> need to build a lot of logic around it.
>>>
>>> Check this instead:
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>
>>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>>> based on kafka data rate / processing throughput. It also doesn't rely on
>>> the reactive mode.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>> wrote:
>>>
>>>> Hello,
>>>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>>>
>>>> 1. As far as I understand, though I've setup `scheduler-mode:
>>>> reactive`, it will not change parallelism automatically by itself, by CPU
>>>> usage or Kafka consumer rate. It needs additional resource monitor features
>>>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>>>> 2. Is it possible to create a custom resource monitor provider
>>>> application? For example, if I want to increase/decrease parallelism by
>>>> Kafka consumer rate, do I need to send specific API from outside, to order
>>>> rescaling?
>>>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>>>> Because as far as I think, calling a specific API will rescale either using
>>>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>>>
>>>> Thanks.
>>>>
>>>> Regards
>>>>
>>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
The reactive mode reacts to available resources. The autoscaler reacts to
changing load and processing capacity and adjusts resources.

Completely different concepts and applicability.
Most people want the autoscaler , but this is a recent feature and is
specific to the k8s operator at the moment.

Gyula

On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:

> Hello,
> Thanks for your notice.
>
> Than what is the purpose of using 'reactive', if this doesn't do anything
> itself?
> What is the difference if I use auto-scaler without 'reactive' mode?
>
> Regards,
> Jung
>
>
>
> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>
>> Hi!
>>
>> I think what you need is probably not the reactive mode but a proper
>> autoscaler. The reactive mode as you say doesn't do anything in itself, you
>> need to build a lot of logic around it.
>>
>> Check this instead:
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>
>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>> based on kafka data rate / processing throughput. It also doesn't rely on
>> the reactive mode.
>>
>> Cheers,
>> Gyula
>>
>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:
>>
>>> Hello,
>>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>>
>>> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
>>> it will not change parallelism automatically by itself, by CPU usage or
>>> Kafka consumer rate. It needs additional resource monitor features (such as
>>> Horizontal Pod Autoscaler, or else). Is this correct?
>>> 2. Is it possible to create a custom resource monitor provider
>>> application? For example, if I want to increase/decrease parallelism by
>>> Kafka consumer rate, do I need to send specific API from outside, to order
>>> rescaling?
>>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>>> Because as far as I think, calling a specific API will rescale either using
>>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>>
>>> Thanks.
>>>
>>> Regards
>>>
>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-08-31 Thread Dennis Jung
Hello,
Thanks for your notice.

Than what is the purpose of using 'reactive', if this doesn't do anything
itself?
What is the difference if I use auto-scaler without 'reactive' mode?

Regards,
Jung



2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:

> Hi!
>
> I think what you need is probably not the reactive mode but a proper
> autoscaler. The reactive mode as you say doesn't do anything in itself, you
> need to build a lot of logic around it.
>
> Check this instead:
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>
> The Kubernetes Operator has a built in autoscaler that can scale jobs
> based on kafka data rate / processing throughput. It also doesn't rely on
> the reactive mode.
>
> Cheers,
> Gyula
>
> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:
>
>> Hello,
>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>
>> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
>> it will not change parallelism automatically by itself, by CPU usage or
>> Kafka consumer rate. It needs additional resource monitor features (such as
>> Horizontal Pod Autoscaler, or else). Is this correct?
>> 2. Is it possible to create a custom resource monitor provider
>> application? For example, if I want to increase/decrease parallelism by
>> Kafka consumer rate, do I need to send specific API from outside, to order
>> rescaling?
>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>> Because as far as I think, calling a specific API will rescale either using
>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>
>> Thanks.
>>
>> Regards
>>
>>


Re: [Question] How to scale application based on 'reactive' mode

2023-08-18 Thread Gyula Fóra
Hi!

I think what you need is probably not the reactive mode but a proper
autoscaler. The reactive mode as you say doesn't do anything in itself, you
need to build a lot of logic around it.

Check this instead:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

The Kubernetes Operator has a built in autoscaler that can scale jobs based
on kafka data rate / processing throughput. It also doesn't rely on the
reactive mode.

Cheers,
Gyula

On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:

> Hello,
> Sorry for frequent questions. This is a question about 'reactive' mode.
>
> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
> it will not change parallelism automatically by itself, by CPU usage or
> Kafka consumer rate. It needs additional resource monitor features (such as
> Horizontal Pod Autoscaler, or else). Is this correct?
> 2. Is it possible to create a custom resource monitor provider
> application? For example, if I want to increase/decrease parallelism by
> Kafka consumer rate, do I need to send specific API from outside, to order
> rescaling?
> 3. If 2 is correct, what is the difference when using 'reactive' mode?
> Because as far as I think, calling a specific API will rescale either using
> 'reactive' mode or not...(or is the API just working based on this mode)?
>
> Thanks.
>
> Regards
>
>


[Question] How to scale application based on 'reactive' mode

2023-08-18 Thread Dennis Jung
Hello,
Sorry for frequent questions. This is a question about 'reactive' mode.

1. As far as I understand, though I've setup `scheduler-mode: reactive`, it
will not change parallelism automatically by itself, by CPU usage or Kafka
consumer rate. It needs additional resource monitor features (such as
Horizontal Pod Autoscaler, or else). Is this correct?
2. Is it possible to create a custom resource monitor provider application?
For example, if I want to increase/decrease parallelism by Kafka consumer
rate, do I need to send specific API from outside, to order rescaling?
3. If 2 is correct, what is the difference when using 'reactive' mode?
Because as far as I think, calling a specific API will rescale either using
'reactive' mode or not...(or is the API just working based on this mode)?

Thanks.

Regards


Re: [Question] Good way to monitor data skewness

2023-08-17 Thread Dennis Jung
Hello,
Thanks for feedback. I'll try to add setup in the internal Grafana.

BR,
JUNG


2023년 8월 16일 (수) 오후 6:29, Hang Ruan 님이 작성:

> Hi, Dennis.
>
> As Ron said, we could judge this situation by the metrics.
> We are usually reporting the metrics to the external system like
> Prometheus by the metric reporter[1]. And these metrics could be shown by
> some other tools like grafana[2].
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/#prometheus
> [2] https://grafana.com/oss/
>
> liu ron  于2023年8月16日周三 14:39写道:
>
>> Hi, Dennis,
>>
>> Although all operators are chained together, each operator metrics is
>> there, you can view the metrcis related to the corresponding operator's
>> input and output records through the UI, as following:
>> [image: image.png]
>>
>>
>> Best,
>> Ron
>>
>> Dennis Jung  于2023年8月16日周三 14:13写道:
>>
>>> Hello people,
>>> I'm trying to monitor data skewness with 'web-ui', between TaskManagers.
>>>
>>> Currently all operators has been chained, so I cannot find how data has
>>> been skewed to TaskManagers (or subtasks). But if I disable chaining,
>>> AFAIK, it can degrade performance.
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
>>>
>>> Could someone give a good suggestion on how to monitor whether data skew
>>> happens or not, in Flink app?
>>>
>>> Thank you.
>>>
>>>
>>>


Re: [Question] Good way to monitor data skewness

2023-08-16 Thread Hang Ruan
Hi, Dennis.

As Ron said, we could judge this situation by the metrics.
We are usually reporting the metrics to the external system like Prometheus
by the metric reporter[1]. And these metrics could be shown by some other
tools like grafana[2].

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/#prometheus
[2] https://grafana.com/oss/

liu ron  于2023年8月16日周三 14:39写道:

> Hi, Dennis,
>
> Although all operators are chained together, each operator metrics is
> there, you can view the metrcis related to the corresponding operator's
> input and output records through the UI, as following:
> [image: image.png]
>
>
> Best,
> Ron
>
> Dennis Jung  于2023年8月16日周三 14:13写道:
>
>> Hello people,
>> I'm trying to monitor data skewness with 'web-ui', between TaskManagers.
>>
>> Currently all operators has been chained, so I cannot find how data has
>> been skewed to TaskManagers (or subtasks). But if I disable chaining,
>> AFAIK, it can degrade performance.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
>>
>> Could someone give a good suggestion on how to monitor whether data skew
>> happens or not, in Flink app?
>>
>> Thank you.
>>
>>
>>


Re: [Question] Good way to monitor data skewness

2023-08-16 Thread liu ron
Hi, Dennis,

Although all operators are chained together, each operator metrics is
there, you can view the metrcis related to the corresponding operator's
input and output records through the UI, as following:
[image: image.png]


Best,
Ron

Dennis Jung  于2023年8月16日周三 14:13写道:

> Hello people,
> I'm trying to monitor data skewness with 'web-ui', between TaskManagers.
>
> Currently all operators has been chained, so I cannot find how data has
> been skewed to TaskManagers (or subtasks). But if I disable chaining,
> AFAIK, it can degrade performance.
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
>
> Could someone give a good suggestion on how to monitor whether data skew
> happens or not, in Flink app?
>
> Thank you.
>
>
>


[Question] Good way to monitor data skewness

2023-08-16 Thread Dennis Jung
Hello people,
I'm trying to monitor data skewness with 'web-ui', between TaskManagers.

Currently all operators has been chained, so I cannot find how data has
been skewed to TaskManagers (or subtasks). But if I disable chaining,
AFAIK, it can degrade performance.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups

Could someone give a good suggestion on how to monitor whether data skew
happens or not, in Flink app?

Thank you.


Re: Question about serialization of java.util classes

2023-08-15 Thread Alexis Sarda-Espinosa
gt;>> which contains java List, so I think you can refer to the related Pojo
>>> class implementation.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>>>
>>> Best,
>>> Ron
>>>
>>>  于2023年8月13日周日 22:50写道:
>>>
>>>> Greetings,
>>>>
>>>> I am working on a project that needs to process around 100k events per
>>>> second and I'm trying to improve performance.
>>>>
>>>> Most of the classes being used are POJOs but have a couple of fields
>>>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet`
>>>> etc. This forces Flink to use Kyro and throw these warnings:
>>>>
>>>> ```
>>>> class java.util.ArrayList does not contain a setter for field size
>>>> Class class java.util.ArrayList cannot be used as a POJO type because
>>>> not all fields are valid POJO fields, and must be processed as GenericType.
>>>> Please read the Flink documentation on "Data Types & Serialization" for
>>>> details of the effect on performance and schema evolution.
>>>> ```
>>>>
>>>> ```
>>>> No fields were detected for class java.util.HashSet so it cannot be
>>>> used as a POJO type and must be processed as GenericType. Please read the
>>>> Flink documentation on "Data Types & Serialization" for details of the
>>>> effect on performance and schema evolution.
>>>> I read through the documentation and stackoverflow and the conclusion
>>>> is that I need to make a TypeInfoFactory and use it inside a TypeInfo
>>>> annotation over my POJO.
>>>> ```
>>>>
>>>> My question is what do I need to do to get Flink to recognize my
>>>> classes as POJOs and use the POJO serializer for better performance?
>>>> I read through the documentation and stackoverflow and the conclusion
>>>> is that I need to make a TypeInfoFactory and use it inside a TypeInfo
>>>> annotation over my POJO.
>>>> While this seems incredibly tedious and I keep thinking "there must be
>>>> a better way", I would be fine with this solution if I could figure out how
>>>> to do this for the Set types I'm using.
>>>>
>>>> Any help would be appreciated.
>>>
>>>
>>>
>>
>


Re: Question about serialization of java.util classes

2023-08-15 Thread s
ore/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192>
>>> 
>>> Best,
>>> Ron
>>> 
>>> mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道:
>>> Greetings,
>>> 
>>> I am working on a project that needs to process around 100k events per 
>>> second and I'm trying to improve performance.
>>> 
>>> Most of the classes being used are POJOs but have a couple of fields using 
>>> a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
>>> forces Flink to use Kyro and throw these warnings:
>>> 
>>> ```
>>> class java.util.ArrayList does not contain a setter for field size
>>> Class class java.util.ArrayList cannot be used as a POJO type because not 
>>> all fields are valid POJO fields, and must be processed as GenericType. 
>>> Please read the Flink documentation on "Data Types & Serialization" for 
>>> details of the effect on performance and schema evolution.
>>> ```
>>> 
>>> ```
>>> No fields were detected for class java.util.HashSet so it cannot be used as 
>>> a POJO type and must be processed as GenericType. Please read the Flink 
>>> documentation on "Data Types & Serialization" for details of the effect on 
>>> performance and schema evolution.
>>> I read through the documentation and stackoverflow and the conclusion is 
>>> that I need to make a TypeInfoFactory and use it inside a TypeInfo 
>>> annotation over my POJO.
>>> ```
>>> 
>>> My question is what do I need to do to get Flink to recognize my classes as 
>>> POJOs and use the POJO serializer for better performance?
>>> I read through the documentation and stackoverflow and the conclusion is 
>>> that I need to make a TypeInfoFactory and use it inside a TypeInfo 
>>> annotation over my POJO.
>>> While this seems incredibly tedious and I keep thinking "there must be a 
>>> better way", I would be fine with this solution if I could figure out how 
>>> to do this for the Set types I'm using.
>>> 
>>> Any help would be appreciated.
>> 
> 



Re: Question about serialization of java.util classes

2023-08-14 Thread Alexis Sarda-Espinosa
nk.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>
> at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>
> at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>
> at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
> at
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052)
> at com.example.App.main(App.java:26)
> ```
>
> Best regards,
> Saleh.
>
>
> On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user <
> user@flink.apache.org> wrote:
>
> Hi Saleh,
>
> If you could show us the minimal code example of the issue (event
> classes), I think someone could help you to solve it.
>
> Best regards,
> Alexey
>
> On Mon, Aug 14, 2023 at 9:23 AM  wrote:
>
>> Hi,
>>
>> According to this blog post
>> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
>> The "Must be processed as GenericType" message means that the POJO
>> serializer will not be used and instead, Kyro will be used.
>>
>> I created a simple POJO to test it again with a java Collection but I got
>> the same message. Disabling generic types throws an exception.
>>
>> I'm not sure how to use these types along with the POJO serializer or any
>> other fast serializer.
>>
>> Best regards,
>> Saleh.
>>
>>
>>
>> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
>>
>> Hi,
>>
>> According to the test in [1], I think Flink can recognize Pojo class
>> which contains java List, so I think you can refer to the related Pojo
>> class implementation.
>>
>> [1]
>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>>
>> Best,
>> Ron
>>
>>  于2023年8月13日周日 22:50写道:
>>
>>> Greetings,
>>>
>>> I am working on a project that needs to process around 100k events per
>>> second and I'm trying to improve performance.
>>>
>>> Most of the classes being used are POJOs but have a couple of fields
>>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet`
>>> etc. This forces Flink to use Kyro and throw these warnings:
>>>
>>> ```
>>> class java.util.ArrayList does not contain a setter for field size
>>> Class class java.util.ArrayList cannot be used as a POJO type because
>>> not all fields are valid POJO fields, and must be processed as GenericType.
>>> Please read the Flink documentation on "Data Types & Serialization" for
>>> details of the effect on performance and schema evolution.
>>> ```
>>>
>>> ```
>>> No fields were detected for class java.util.HashSet so it cannot be used
>>> as a POJO type and must be processed as GenericType. Please read the Flink
>>> documentation on "Data Types & Serialization" for details of the effect on
>>> performance and schema evolution.
>>> I read through the documentation and stackoverflow and the conclusion is
>>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>>> annotation over my POJO.
>>> ```
>>>
>>> My question is what do I need to do to get Flink to recognize my classes
>>> as POJOs and use the POJO serializer for better performance?
>>> I read through the documentation and stackoverflow and the conclusion is
>>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>>> annotation over my POJO.
>>> While this seems incredibly tedious and I keep thinking "there must be a
>>> better way", I would be fine with this solution if I could figure out how
>>> to do this for the Set types I'm using.
>>>
>>> Any help would be appreciated.
>>
>>
>>
>


Re: Question about serialization of java.util classes

2023-08-14 Thread s
ache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052)
at com.example.App.main(App.java:26)
```

Best regards,
Saleh.


> On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user  
> wrote:
> 
> Hi Saleh,
> 
> If you could show us the minimal code example of the issue (event classes), I 
> think someone could help you to solve it.
> 
> Best regards,
> Alexey
> 
> On Mon, Aug 14, 2023 at 9:23 AM mailto:s...@sammar.sa>> 
> wrote:
> Hi,
> 
> According to this blog post 
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
>  
> <https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer>
> The "Must be processed as GenericType" message means that the POJO serializer 
> will not be used and instead, Kyro will be used.
> 
> I created a simple POJO to test it again with a java Collection but I got the 
> same message. Disabling generic types throws an exception.
> 
> I'm not sure how to use these types along with the POJO serializer or any 
> other fast serializer.
> 
> Best regards,
> Saleh.
> 
> 
> 
>> On 14 Aug 2023, at 4:59 AM, liu ron > <mailto:ron9@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> According to the test in [1], I think Flink can recognize Pojo class which 
>> contains java List, so I think you can refer to the related Pojo class 
>> implementation.
>> 
>> [1] 
>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>>  
>> <https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192>
>> 
>> Best,
>> Ron
>> 
>> mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道:
>> Greetings,
>> 
>> I am working on a project that needs to process around 100k events per 
>> second and I'm trying to improve performance.
>> 
>> Most of the classes being used are POJOs but have a couple of fields using a 
>> `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
>> forces Flink to use Kyro and throw these warnings:
>> 
>> ```
>> class java.util.ArrayList does not contain a setter for field size
>> Class class java.util.ArrayList cannot be used as a POJO type because not 
>> all fields are valid POJO fields, and must be processed as GenericType. 
>> Please read the Flink documentation on "Data Types & Serialization" for 
>> details of the effect on performance and schema evolution.
>> ```
>> 
>> ```
>> No fields were detected for class java.util.HashSet so it cannot be used as 
>> a POJO type and must be processed as GenericType. Please read the Flink 
>> documentation on "Data Types & Serialization" for details of the effect on 
>> performance and schema evolution.
>> I read through the documentation and stackoverflow and the conclusion is 
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo 
>> annotation over my POJO.
>> ```
>> 
>> My question is what do I need to do to get Flink to recognize my classes as 
>> POJOs and use the POJO serializer for better performance?
>> I read through the documentation and stackoverflow and the conclusion is 
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo 
>> annotation over my POJO.
>> While this seems incredibly tedious and I keep thinking "there must be a 
>> better way", I would be fine with this solution if I could figure out how to 
>> do this for the Set types I'm using.
>> 
>> Any help would be appreciated.
> 



Re: Question about serialization of java.util classes

2023-08-14 Thread Alexey Novakov via user
Hi Saleh,

If you could show us the minimal code example of the issue (event classes),
I think someone could help you to solve it.

Best regards,
Alexey

On Mon, Aug 14, 2023 at 9:23 AM  wrote:

> Hi,
>
> According to this blog post
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
> The "Must be processed as GenericType" message means that the POJO
> serializer will not be used and instead, Kyro will be used.
>
> I created a simple POJO to test it again with a java Collection but I got
> the same message. Disabling generic types throws an exception.
>
> I'm not sure how to use these types along with the POJO serializer or any
> other fast serializer.
>
> Best regards,
> Saleh.
>
>
>
> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
>
> Hi,
>
> According to the test in [1], I think Flink can recognize Pojo class which
> contains java List, so I think you can refer to the related Pojo class
> implementation.
>
> [1]
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>
> Best,
> Ron
>
>  于2023年8月13日周日 22:50写道:
>
>> Greetings,
>>
>> I am working on a project that needs to process around 100k events per
>> second and I'm trying to improve performance.
>>
>> Most of the classes being used are POJOs but have a couple of fields
>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet`
>> etc. This forces Flink to use Kyro and throw these warnings:
>>
>> ```
>> class java.util.ArrayList does not contain a setter for field size
>> Class class java.util.ArrayList cannot be used as a POJO type because not
>> all fields are valid POJO fields, and must be processed as GenericType.
>> Please read the Flink documentation on "Data Types & Serialization" for
>> details of the effect on performance and schema evolution.
>> ```
>>
>> ```
>> No fields were detected for class java.util.HashSet so it cannot be used
>> as a POJO type and must be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> ```
>>
>> My question is what do I need to do to get Flink to recognize my classes
>> as POJOs and use the POJO serializer for better performance?
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> While this seems incredibly tedious and I keep thinking "there must be a
>> better way", I would be fine with this solution if I could figure out how
>> to do this for the Set types I'm using.
>>
>> Any help would be appreciated.
>
>
>


Re: Question about serialization of java.util classes

2023-08-14 Thread s
Hi,

According to this blog post 
https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
 
<https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer>
The "Must be processed as GenericType" message means that the POJO serializer 
will not be used and instead, Kyro will be used.

I created a simple POJO to test it again with a java Collection but I got the 
same message. Disabling generic types throws an exception.

I'm not sure how to use these types along with the POJO serializer or any other 
fast serializer.

Best regards,
Saleh.



> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
> 
> Hi,
> 
> According to the test in [1], I think Flink can recognize Pojo class which 
> contains java List, so I think you can refer to the related Pojo class 
> implementation.
> 
> [1] 
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>  
> <https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192>
> 
> Best,
> Ron
> 
> mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道:
> Greetings,
> 
> I am working on a project that needs to process around 100k events per second 
> and I'm trying to improve performance.
> 
> Most of the classes being used are POJOs but have a couple of fields using a 
> `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
> forces Flink to use Kyro and throw these warnings:
> 
> ```
> class java.util.ArrayList does not contain a setter for field size
> Class class java.util.ArrayList cannot be used as a POJO type because not all 
> fields are valid POJO fields, and must be processed as GenericType. Please 
> read the Flink documentation on "Data Types & Serialization" for details of 
> the effect on performance and schema evolution.
> ```
> 
> ```
> No fields were detected for class java.util.HashSet so it cannot be used as a 
> POJO type and must be processed as GenericType. Please read the Flink 
> documentation on "Data Types & Serialization" for details of the effect on 
> performance and schema evolution.
> I read through the documentation and stackoverflow and the conclusion is that 
> I need to make a TypeInfoFactory and use it inside a TypeInfo annotation over 
> my POJO.
> ```
> 
> My question is what do I need to do to get Flink to recognize my classes as 
> POJOs and use the POJO serializer for better performance?
> I read through the documentation and stackoverflow and the conclusion is that 
> I need to make a TypeInfoFactory and use it inside a TypeInfo annotation over 
> my POJO.
> While this seems incredibly tedious and I keep thinking "there must be a 
> better way", I would be fine with this solution if I could figure out how to 
> do this for the Set types I'm using.
> 
> Any help would be appreciated.



Re: Question about serialization of java.util classes

2023-08-13 Thread liu ron
Hi,

According to the test in [1], I think Flink can recognize Pojo class which
contains java List, so I think you can refer to the related Pojo class
implementation.

[1]
https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192

Best,
Ron

 于2023年8月13日周日 22:50写道:

> Greetings,
>
> I am working on a project that needs to process around 100k events per
> second and I'm trying to improve performance.
>
> Most of the classes being used are POJOs but have a couple of fields using
> a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This
> forces Flink to use Kyro and throw these warnings:
>
> ```
> class java.util.ArrayList does not contain a setter for field size
> Class class java.util.ArrayList cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization" for
> details of the effect on performance and schema evolution.
> ```
>
> ```
> No fields were detected for class java.util.HashSet so it cannot be used
> as a POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance and schema evolution.
> I read through the documentation and stackoverflow and the conclusion is
> that I need to make a TypeInfoFactory and use it inside a TypeInfo
> annotation over my POJO.
> ```
>
> My question is what do I need to do to get Flink to recognize my classes
> as POJOs and use the POJO serializer for better performance?
> I read through the documentation and stackoverflow and the conclusion is
> that I need to make a TypeInfoFactory and use it inside a TypeInfo
> annotation over my POJO.
> While this seems incredibly tedious and I keep thinking "there must be a
> better way", I would be fine with this solution if I could figure out how
> to do this for the Set types I'm using.
>
> Any help would be appreciated.


Question about serialization of java.util classes

2023-08-13 Thread s
Greetings,

I am working on a project that needs to process around 100k events per second 
and I'm trying to improve performance.

Most of the classes being used are POJOs but have a couple of fields using a 
`java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
forces Flink to use Kyro and throw these warnings:

```
class java.util.ArrayList does not contain a setter for field size
Class class java.util.ArrayList cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
```

```
No fields were detected for class java.util.HashSet so it cannot be used as a 
POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
I read through the documentation and stackoverflow and the conclusion is that I 
need to make a TypeInfoFactory and use it inside a TypeInfo annotation over my 
POJO.
```

My question is what do I need to do to get Flink to recognize my classes as 
POJOs and use the POJO serializer for better performance?
I read through the documentation and stackoverflow and the conclusion is that I 
need to make a TypeInfoFactory and use it inside a TypeInfo annotation over my 
POJO.
While this seems incredibly tedious and I keep thinking "there must be a better 
way", I would be fine with this solution if I could figure out how to do this 
for the Set types I'm using.

Any help would be appreciated.

RE: Re: flink configuration in flink kubernetes operator question about password

2023-07-26 Thread Jiabao Sun
Hi tian tian,

I think we can use podTemplate to mount kubernetes secrets as file or 
environment variables.
Then we can access the secrets in our flink program. 

Please refers to

https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml
 

https://kubernetes.io/docs/concepts/configuration/secret/#using-a-secret 


On 2023/07/21 10:53:10 tian tian wrote:
> Like s3.secret-key, the plaintext password cannot be directly written in
> the configuration. Is there a template language like jinja that can be
> replaced after mounting to the pod?
> 
> >
> 

Re: flink configuration in flink kubernetes operator question about password

2023-07-21 Thread tian tian
Like s3.secret-key, the plaintext password cannot be directly written in
the configuration. Is there a template language like jinja that can be
replaced after mounting to the pod?

>


flink configuration in flink kubernetes operator question about password

2023-07-21 Thread tian tian
hi all, How to specify the password and other information that needs to be
encrypted in the configuration file?


Re: apache-flink java question

2023-06-01 Thread Jing Ge via user
Hi Chris,

not yet and we are working on it[1].

best regards,
Jing

[1]
https://issues.apache.org/jira/browse/FLINK-15736?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17697544#comment-17697544

On Thu, Jun 1, 2023 at 11:40 PM Joseph, Chris S 
wrote:

> Hi,
>
>
>
> Does Apache flink table api work with java 17?
>
>
>
> Thanks,
>
> Chris Joseph
>
>
>


apache-flink java question

2023-06-01 Thread Joseph, Chris S
Hi,

Does Apache flink table api work with java 17?

Thanks,
Chris Joseph



Re: Flink Kubernetes Operator lifecycle state count metrics question

2023-05-23 Thread Gyula Fóra
Hi Andrew!

I think you are completely right, this is a bug. The per namespace metrics
do not seem to filter per namespace and show the aggregated global count
for each namespace:

I opened a ticket:
https://issues.apache.org/jira/browse/FLINK-32164

Thanks for reporting this!
Gyula

On Mon, May 22, 2023 at 10:49 PM Andrew Otto  wrote:

> Also!  I do have 2 FlinkDeployments deployed with this operator, but they
> are in different namespaces, and each of the per namespace metrics reports
> that it has 2 Deployments in them, even though there is only one according
> to kubectl.
>
> Actually...we just tried to deploy a change (enabling some checkpointing)
> that caused one of our FlinkDeployments to fail.  Now, both namespace
> STABLE_Counts each report 1.
>
> # curl -s : | grep
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 1.0
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="rdf_streaming_updater",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 1.0
>
> It looks like maybe this metric is not reporting per namespace, but a
> global count.
>
>
>
> On Mon, May 22, 2023 at 2:56 PM Andrew Otto  wrote:
>
>> Oh, FWIW, I do have operator HA enabled with 2 replicas running, but in
>> my examples there, I am curl-ing the leader flink operator pod.
>>
>>
>>
>> On Mon, May 22, 2023 at 2:47 PM Andrew Otto  wrote:
>>
>>> Hello!
>>>
>>> I'm doing some grafana+prometheus dashboarding for
>>> flink-kubernetes-operator.  Reading metrics docs
>>> , I see that I have nice per k8s
>>> namespace lifecycle current count gauge metrics in Prometheus.
>>>
>>> Using kubectl, I can see that I have one FlinkDeployment in my namespace:
>>>
>>> # kubectl -n stream-enrichment-poc get flinkdeployments
>>> NAME JOB STATUS   LIFECYCLE STATE
>>> flink-app-main   RUNNING  STABLE
>>>
>>> But, prometheus is reporting that I have 2 FlinkDeployments in the
>>> STABLE state.
>>>
>>> # curl -s :  | grep
>>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
>>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>>> 2.0
>>>
>>> I'm not sure why I see 2.0 reported.
>>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only
>>> one FlinkDeployment.
>>>
>>> # curl :/metrics | grep
>>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
>>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>>> 1.0
>>>
>>> Is it possible that
>>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count is being
>>> reported as an incrementing counter instead of a guage?
>>>
>>> Thanks
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>>


Re: Question about Flink exception handling

2023-05-23 Thread Sharif Khan via user
Thanks for the clarification.

On Tue, May 23, 2023 at 7:07 PM Weihua Hu  wrote:

> Hi Sharif,
>
> You could not catch exceptions globally.
>
> For exceptions that can be explicitly ignored for your business, you need
> to add a try-catch in the operators.
> For exceptions that are not catched, Flink will trigger a recovery from
> failure automatically[1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/task_failure_recovery/
>
> Best,
> Weihua
>
>
> On Tue, May 23, 2023 at 12:24 PM Sharif Khan via user <
> user@flink.apache.org> wrote:
>
>> Thanks for your response.
>>
>> For simplicity, I want to capture exceptions in a centralized manner and
>> log them for further analysis, without interrupting the job's execution or
>> causing it to restart.
>>
>> On Tue, May 23, 2023 at 6:31 AM Shammon FY  wrote:
>>
>>> Hi Sharif,
>>>
>>> I would like to know what do you want to do with the exception after
>>> catching it? There are different ways for different requirements, for
>>> example, Flink has already reported these exceptions.
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Mon, May 22, 2023 at 4:45 PM Sharif Khan via user <
>>> user@flink.apache.org> wrote:
>>>
 Hi, community.
 Can anyone please let me know?

 1. What is the best practice in terms of handling exceptions in Flink
 jobs?

 2. Is there any way to catch exceptions globally in Flink jobs?
 Basically, I want to catch exceptions from any operators in one place
 (globally).

 my expectation is let's say I have a pipeline
 source-> operator(A) -> operator(B) -> operator(C) -> sink.
 I don't want to write a try-catch for every operator. Is it possible to
 write one try-catch for the whole pipeline?

 I'm using the Python version of the Flink API. version 1.16

 Thanks in advance.

 [image: SELISE]

 SELISE Group
 Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
 Munich: Tal 44, 80331 München, Germany
 Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
 Emirates
 Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
 Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
 Bhutan

 Visit us: www.selisegroup.com

 *Important Note: This e-mail and any attachment are confidential and
 may contain trade secrets and may well also be legally privileged or
 otherwise protected from disclosure. If you have received it in error, you
 are on notice of its status. Please notify us immediately by reply e-mail
 and then delete this e-mail and any attachment from your system. If you are
 not the intended recipient please understand that you must not copy this
 e-mail or any attachment or disclose the contents to any other person.
 Thank you for your cooperation.*

>>>
>>
>> --
>>
>> Md. Sharif Khan, BSc in CSE, DIU
>>
>> Software Engineer
>>
>> Mobile: +880 1741976078
>>
>> [image: SELISE]
>>
>> SELISE Group
>> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
>> Munich: Tal 44, 80331 München, Germany
>> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
>> Emirates
>> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
>> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
>> Bhutan
>>
>> Visit us: www.selisegroup.com
>>
>> *Important Note: This e-mail and any attachment are confidential and may
>> contain trade secrets and may well also be legally privileged or otherwise
>> protected from disclosure. If you have received it in error, you are on
>> notice of its status. Please notify us immediately by reply e-mail and then
>> delete this e-mail and any attachment from your system. If you are not the
>> intended recipient please understand that you must not copy this e-mail or
>> any attachment or disclose the contents to any other person. Thank you for
>> your cooperation.*
>>
>

-- 

Md. Sharif Khan, BSc in CSE, DIU

Software Engineer

Mobile: +880 1741976078

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to 

Re: Question about Flink exception handling

2023-05-23 Thread Weihua Hu
Hi Sharif,

You could not catch exceptions globally.

For exceptions that can be explicitly ignored for your business, you need
to add a try-catch in the operators.
For exceptions that are not catched, Flink will trigger a recovery from
failure automatically[1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/task_failure_recovery/

Best,
Weihua


On Tue, May 23, 2023 at 12:24 PM Sharif Khan via user 
wrote:

> Thanks for your response.
>
> For simplicity, I want to capture exceptions in a centralized manner and
> log them for further analysis, without interrupting the job's execution or
> causing it to restart.
>
> On Tue, May 23, 2023 at 6:31 AM Shammon FY  wrote:
>
>> Hi Sharif,
>>
>> I would like to know what do you want to do with the exception after
>> catching it? There are different ways for different requirements, for
>> example, Flink has already reported these exceptions.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Mon, May 22, 2023 at 4:45 PM Sharif Khan via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi, community.
>>> Can anyone please let me know?
>>>
>>> 1. What is the best practice in terms of handling exceptions in Flink
>>> jobs?
>>>
>>> 2. Is there any way to catch exceptions globally in Flink jobs?
>>> Basically, I want to catch exceptions from any operators in one place
>>> (globally).
>>>
>>> my expectation is let's say I have a pipeline
>>> source-> operator(A) -> operator(B) -> operator(C) -> sink.
>>> I don't want to write a try-catch for every operator. Is it possible to
>>> write one try-catch for the whole pipeline?
>>>
>>> I'm using the Python version of the Flink API. version 1.16
>>>
>>> Thanks in advance.
>>>
>>> [image: SELISE]
>>>
>>> SELISE Group
>>> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
>>> Munich: Tal 44, 80331 München, Germany
>>> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
>>> Emirates
>>> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
>>> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
>>> Bhutan
>>>
>>> Visit us: www.selisegroup.com
>>>
>>> *Important Note: This e-mail and any attachment are confidential and may
>>> contain trade secrets and may well also be legally privileged or otherwise
>>> protected from disclosure. If you have received it in error, you are on
>>> notice of its status. Please notify us immediately by reply e-mail and then
>>> delete this e-mail and any attachment from your system. If you are not the
>>> intended recipient please understand that you must not copy this e-mail or
>>> any attachment or disclose the contents to any other person. Thank you for
>>> your cooperation.*
>>>
>>
>
> --
>
> Md. Sharif Khan, BSc in CSE, DIU
>
> Software Engineer
>
> Mobile: +880 1741976078
>
> [image: SELISE]
>
> SELISE Group
> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
> Munich: Tal 44, 80331 München, Germany
> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
> Emirates
> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
> Bhutan
>
> Visit us: www.selisegroup.com
>
> *Important Note: This e-mail and any attachment are confidential and may
> contain trade secrets and may well also be legally privileged or otherwise
> protected from disclosure. If you have received it in error, you are on
> notice of its status. Please notify us immediately by reply e-mail and then
> delete this e-mail and any attachment from your system. If you are not the
> intended recipient please understand that you must not copy this e-mail or
> any attachment or disclose the contents to any other person. Thank you for
> your cooperation.*
>


Re: Question about Flink exception handling

2023-05-22 Thread Sharif Khan via user
Thanks for your response.

For simplicity, I want to capture exceptions in a centralized manner and
log them for further analysis, without interrupting the job's execution or
causing it to restart.

On Tue, May 23, 2023 at 6:31 AM Shammon FY  wrote:

> Hi Sharif,
>
> I would like to know what do you want to do with the exception after
> catching it? There are different ways for different requirements, for
> example, Flink has already reported these exceptions.
>
> Best,
> Shammon FY
>
>
> On Mon, May 22, 2023 at 4:45 PM Sharif Khan via user <
> user@flink.apache.org> wrote:
>
>> Hi, community.
>> Can anyone please let me know?
>>
>> 1. What is the best practice in terms of handling exceptions in Flink
>> jobs?
>>
>> 2. Is there any way to catch exceptions globally in Flink jobs?
>> Basically, I want to catch exceptions from any operators in one place
>> (globally).
>>
>> my expectation is let's say I have a pipeline
>> source-> operator(A) -> operator(B) -> operator(C) -> sink.
>> I don't want to write a try-catch for every operator. Is it possible to
>> write one try-catch for the whole pipeline?
>>
>> I'm using the Python version of the Flink API. version 1.16
>>
>> Thanks in advance.
>>
>> [image: SELISE]
>>
>> SELISE Group
>> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
>> Munich: Tal 44, 80331 München, Germany
>> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
>> Emirates
>> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
>> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
>> Bhutan
>>
>> Visit us: www.selisegroup.com
>>
>> *Important Note: This e-mail and any attachment are confidential and may
>> contain trade secrets and may well also be legally privileged or otherwise
>> protected from disclosure. If you have received it in error, you are on
>> notice of its status. Please notify us immediately by reply e-mail and then
>> delete this e-mail and any attachment from your system. If you are not the
>> intended recipient please understand that you must not copy this e-mail or
>> any attachment or disclose the contents to any other person. Thank you for
>> your cooperation.*
>>
>

-- 

Md. Sharif Khan, BSc in CSE, DIU

Software Engineer

Mobile: +880 1741976078

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


Re: Question about Flink exception handling

2023-05-22 Thread Shammon FY
Hi Sharif,

I would like to know what do you want to do with the exception after
catching it? There are different ways for different requirements, for
example, Flink has already reported these exceptions.

Best,
Shammon FY


On Mon, May 22, 2023 at 4:45 PM Sharif Khan via user 
wrote:

> Hi, community.
> Can anyone please let me know?
>
> 1. What is the best practice in terms of handling exceptions in Flink jobs?
>
> 2. Is there any way to catch exceptions globally in Flink jobs? Basically,
> I want to catch exceptions from any operators in one place (globally).
>
> my expectation is let's say I have a pipeline
> source-> operator(A) -> operator(B) -> operator(C) -> sink.
> I don't want to write a try-catch for every operator. Is it possible to
> write one try-catch for the whole pipeline?
>
> I'm using the Python version of the Flink API. version 1.16
>
> Thanks in advance.
>
> [image: SELISE]
>
> SELISE Group
> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
> Munich: Tal 44, 80331 München, Germany
> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
> Emirates
> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
> Bhutan
>
> Visit us: www.selisegroup.com
>
> *Important Note: This e-mail and any attachment are confidential and may
> contain trade secrets and may well also be legally privileged or otherwise
> protected from disclosure. If you have received it in error, you are on
> notice of its status. Please notify us immediately by reply e-mail and then
> delete this e-mail and any attachment from your system. If you are not the
> intended recipient please understand that you must not copy this e-mail or
> any attachment or disclose the contents to any other person. Thank you for
> your cooperation.*
>


Re: Flink Kubernetes Operator lifecycle state count metrics question

2023-05-22 Thread Andrew Otto
Also!  I do have 2 FlinkDeployments deployed with this operator, but they
are in different namespaces, and each of the per namespace metrics reports
that it has 2 Deployments in them, even though there is only one according
to kubectl.

Actually...we just tried to deploy a change (enabling some checkpointing)
that caused one of our FlinkDeployments to fail.  Now, both namespace
STABLE_Counts each report 1.

# curl -s : | grep
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
1.0
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="rdf_streaming_updater",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
1.0

It looks like maybe this metric is not reporting per namespace, but a
global count.



On Mon, May 22, 2023 at 2:56 PM Andrew Otto  wrote:

> Oh, FWIW, I do have operator HA enabled with 2 replicas running, but in my
> examples there, I am curl-ing the leader flink operator pod.
>
>
>
> On Mon, May 22, 2023 at 2:47 PM Andrew Otto  wrote:
>
>> Hello!
>>
>> I'm doing some grafana+prometheus dashboarding for
>> flink-kubernetes-operator.  Reading metrics docs
>> , I see that I have nice per k8s
>> namespace lifecycle current count gauge metrics in Prometheus.
>>
>> Using kubectl, I can see that I have one FlinkDeployment in my namespace:
>>
>> # kubectl -n stream-enrichment-poc get flinkdeployments
>> NAME JOB STATUS   LIFECYCLE STATE
>> flink-app-main   RUNNING  STABLE
>>
>> But, prometheus is reporting that I have 2 FlinkDeployments in the STABLE
>> state.
>>
>> # curl -s :  | grep
>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>> 2.0
>>
>> I'm not sure why I see 2.0 reported.
>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only
>> one FlinkDeployment.
>>
>> # curl :/metrics | grep
>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>> 1.0
>>
>> Is it possible that
>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count is being
>> reported as an incrementing counter instead of a guage?
>>
>> Thanks
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>


Re: Flink Kubernetes Operator lifecycle state count metrics question

2023-05-22 Thread Andrew Otto
Oh, FWIW, I do have operator HA enabled with 2 replicas running, but in my
examples there, I am curl-ing the leader flink operator pod.



On Mon, May 22, 2023 at 2:47 PM Andrew Otto  wrote:

> Hello!
>
> I'm doing some grafana+prometheus dashboarding for
> flink-kubernetes-operator.  Reading metrics docs
> , I see that I have nice per k8s
> namespace lifecycle current count gauge metrics in Prometheus.
>
> Using kubectl, I can see that I have one FlinkDeployment in my namespace:
>
> # kubectl -n stream-enrichment-poc get flinkdeployments
> NAME JOB STATUS   LIFECYCLE STATE
> flink-app-main   RUNNING  STABLE
>
> But, prometheus is reporting that I have 2 FlinkDeployments in the STABLE
> state.
>
> # curl -s :  | grep
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 2.0
>
> I'm not sure why I see 2.0 reported.
> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only
> one FlinkDeployment.
>
> # curl :/metrics | grep
> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 1.0
>
> Is it possible that
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count is being
> reported as an incrementing counter instead of a guage?
>
> Thanks
> -Andrew Otto
>  Wikimedia Foundation
>
>


Flink Kubernetes Operator lifecycle state count metrics question

2023-05-22 Thread Andrew Otto
Hello!

I'm doing some grafana+prometheus dashboarding for
flink-kubernetes-operator.  Reading metrics docs
, I see that I have nice per k8s
namespace lifecycle current count gauge metrics in Prometheus.

Using kubectl, I can see that I have one FlinkDeployment in my namespace:

# kubectl -n stream-enrichment-poc get flinkdeployments
NAME JOB STATUS   LIFECYCLE STATE
flink-app-main   RUNNING  STABLE

But, prometheus is reporting that I have 2 FlinkDeployments in the STABLE
state.

# curl -s :  | grep
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
2.0

I'm not sure why I see 2.0 reported.
flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only one
FlinkDeployment.

# curl :/metrics | grep
flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
1.0

Is it possible that flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
is being reported as an incrementing counter instead of a guage?

Thanks
-Andrew Otto
 Wikimedia Foundation


Question about Flink exception handling

2023-05-22 Thread Sharif Khan via user
Hi, community.
Can anyone please let me know?

1. What is the best practice in terms of handling exceptions in Flink jobs?

2. Is there any way to catch exceptions globally in Flink jobs? Basically,
I want to catch exceptions from any operators in one place (globally).

my expectation is let's say I have a pipeline
source-> operator(A) -> operator(B) -> operator(C) -> sink.
I don't want to write a try-catch for every operator. Is it possible to
write one try-catch for the whole pipeline?

I'm using the Python version of the Flink API. version 1.16

Thanks in advance.

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com 




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*


Question about RexNodeExtractor formatting UDF names

2023-05-08 Thread Chai Kelun
Hi Flink Team:

I have a question about RexNodeExtractor in Flink 1.16.0. I am trying to push 
down UDFs (with function names in the format ST_XXX, including underscores, 
e.g. ST_Contains) into TableSourceScan, and I have implemented applyFilters and 
handling of pushdown functions based on SupportFilterPushdown API. However, 
line 529 of RexNodeExtractor removes the underscore from the function name, 
which prevents the operator from being pushed down.
I tried hacking this logic and comparing the query plan:
[Before modify RexNodeExtractor#replace()]

== Abstract Syntax Tree ==
LogicalProject(id=[$0], name=[$1])
+- LogicalFilter(condition=[ST_Contains($2, ST_MakePoint(6, 2))])
   +- LogicalTableScan(table=[[default_catalog, default_database, extTable]])

== Optimized Physical Plan ==
Calc(select=[id, name], where=[ST_Contains(fence, ST_MakePoint(6, 2))])
+- TableSourceScan(table=[[default_catalog, default_database, extTable]], 
fields=[id, name, fence])

== Optimized Execution Plan ==
Calc(select=[id, name], where=[ST_Contains(fence, ST_MakePoint(6, 2))])
+- TableSourceScan(table=[[default_catalog, default_database, extTable]], 
fields=[id, name, fence])


[After modify RexNodeExtractor#replace()]
== Abstract Syntax Tree ==
LogicalProject(id=[$0], name=[$1])
+- LogicalFilter(condition=[ST_Contains($2, ST_MakePoint(6, 2))])
   +- LogicalTableScan(table=[[default_catalog, default_database, extTable]])

== Optimized Physical Plan ==
Calc(select=[id, name])
+- TableSourceScan(table=[[default_catalog, default_database, extTable, 
filter=[ST_Contains(fence, ST_MakePoint(6, 2))]]], fields=[id, name, fence])

== Optimized Execution Plan ==
Calc(select=[id, name])
+- TableSourceScan(table=[[default_catalog, default_database, extTable, 
filter=[ST_Contains(fence, ST_MakePoint(6, 2))]]], fields=[id, name, fence])

We can see that the operator cannot be pushed down because the function names 
are formatted. However, according to the Spatial Query standard, the geospatial 
function names are in the format of ST_xxx. I would like to ask what is the 
idea behind this design ?

Best regards,
Kelun


Re: Question about Flink metrics

2023-05-05 Thread Mason Chen
Hi Neha,

For the jobs you care about, you can attach additional labels using
`scope-variables-additional` [1]. The example located in the same page
showcases how you can configure KV pairs in its map configuration. Be sure
to replace the reporter name with the name of your prometheus reporter!

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/metric_reporters/#scope-variables-additional

Best,
Mason

On Thu, May 4, 2023 at 11:35 PM neha goyal  wrote:

> Hello,
> I have a question about the Prometheus metrics. I am able to fetch the
> metrics from the following expression.
> sum(flink_jobmanager_job_numRestarts{job_name="$job_name"}) by (job_name)
> Now I am interested in only a few jobs and I want to give them a label.
> How to achieve this? How to give an additional label to Flink Prometheus
> metrics so that I can fetch the metrics only for those jobs having that
> label? This tag I need to set on the job level. Few jobs will have that tag
> and others won't.
>
>
>


Question about Flink metrics

2023-05-05 Thread neha goyal
Hello,
I have a question about the Prometheus metrics. I am able to fetch the
metrics from the following expression.
sum(flink_jobmanager_job_numRestarts{job_name="$job_name"}) by (job_name)
Now I am interested in only a few jobs and I want to give them a label. How
to achieve this? How to give an additional label to Flink Prometheus
metrics so that I can fetch the metrics only for those jobs having that
label? This tag I need to set on the job level. Few jobs will have that tag
and others won't.


Re: Quick question about flink document.

2023-04-09 Thread Hang Ruan
Hi, Dongwoo,

I think there is no problem in this part. This part describes snapshotting
Operator State, which is checkpointing. The checkpoint will store by the
JobManager and use the checkpoint storage.

Best,
Hang

Feng Jin  于2023年4月10日周一 00:32写道:

> Hi Dongwoo
>
>
> This can be quite confusing.
> Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
> included three types of statebackends:
> *MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.
>
> The default *MemoryStateBackend* uses heap as the backend, and the state
> is stored in jobManger.
>
>
> You can refer to this migration document for more information:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
> .
>
>
> Best
> Feng
>
> On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim 
> wrote:
>
>> Hi community, I’m new to flink and trying to learn about the concepts of
>> flink to prepare migrating heron application to flink.
>> I have a quick question about this flink document.
>> (
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
>> )
>>
>> What I understood is states are stored in configured state backend which
>> can be either task manager’s heap or rocksdb.
>> And snapshots of checkpoint is stored by default in job manager’s heap
>> and mostly in distributed file system.
>> But in the document it says like below and it is confusing to me. Isn’t
>> the second line talking about checkpoint storage or checkpoint backend? Not
>> state backend? Thanks in advance, enjoy your weekend!
>>
>> *"Because the state of a snapshot may be large, it is stored in a
>> configurable state backend
>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/state_backends/>.
>> By default, this is the JobManager’s memory, but for production use a
>> distributed reliable storage should be configured (such as HDFS)” *
>>
>


Re: Quick question about flink document.

2023-04-09 Thread Shammon FY
Hi Dongwoo

I think there are two configurations about state, one is state backend and
the other is snapshot storage. Flink will create a snapshot for each state
when the stateful operator collects all checkpoint barriers.

As @Feng mentioned above, users can config different state backend with
option: state.backend

The snapshot of state can be stored in JobManager. When the state is large,
flink supports storing the snapshot a distributed storage with option:
state.checkpoints.dir:

Best,
Shammon FY


On Mon, Apr 10, 2023 at 12:31 AM Feng Jin  wrote:

> Hi Dongwoo
>
>
> This can be quite confusing.
> Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
> included three types of statebackends:
> *MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.
>
> The default *MemoryStateBackend* uses heap as the backend, and the state
> is stored in jobManger.
>
>
> You can refer to this migration document for more information:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
> .
>
>
> Best
> Feng
>
> On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim 
> wrote:
>
>> Hi community, I’m new to flink and trying to learn about the concepts of
>> flink to prepare migrating heron application to flink.
>> I have a quick question about this flink document.
>> (
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
>> )
>>
>> What I understood is states are stored in configured state backend which
>> can be either task manager’s heap or rocksdb.
>> And snapshots of checkpoint is stored by default in job manager’s heap
>> and mostly in distributed file system.
>> But in the document it says like below and it is confusing to me. Isn’t
>> the second line talking about checkpoint storage or checkpoint backend? Not
>> state backend? Thanks in advance, enjoy your weekend!
>>
>> *"Because the state of a snapshot may be large, it is stored in a
>> configurable state backend
>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/state_backends/>.
>> By default, this is the JobManager’s memory, but for production use a
>> distributed reliable storage should be configured (such as HDFS)” *
>>
>


Re: Quick question about flink document.

2023-04-09 Thread Feng Jin
Hi Dongwoo


This can be quite confusing.
Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
included three types of statebackends:
*MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.

The default *MemoryStateBackend* uses heap as the backend, and the state is
stored in jobManger.


You can refer to this migration document for more information:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
.


Best
Feng

On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim  wrote:

> Hi community, I’m new to flink and trying to learn about the concepts of
> flink to prepare migrating heron application to flink.
> I have a quick question about this flink document.
> (
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
> )
>
> What I understood is states are stored in configured state backend which
> can be either task manager’s heap or rocksdb.
> And snapshots of checkpoint is stored by default in job manager’s heap and
> mostly in distributed file system.
> But in the document it says like below and it is confusing to me. Isn’t
> the second line talking about checkpoint storage or checkpoint backend? Not
> state backend? Thanks in advance, enjoy your weekend!
>
> *"Because the state of a snapshot may be large, it is stored in a
> configurable state backend
> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/state_backends/>.
> By default, this is the JobManager’s memory, but for production use a
> distributed reliable storage should be configured (such as HDFS)” *
>


Quick question about flink document.

2023-04-09 Thread Dongwoo Kim
Hi community, I’m new to flink and trying to learn about the concepts of flink 
to prepare migrating heron application to flink. 
I have a quick question about this flink document.
(https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
 
<https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state>)

What I understood is states are stored in configured state backend which can be 
either task manager’s heap or rocksdb.
And snapshots of checkpoint is stored by default in job manager’s heap and 
mostly in distributed file system.
But in the document it says like below and it is confusing to me. Isn’t the 
second line talking about checkpoint storage or checkpoint backend? Not state 
backend? Thanks in advance, enjoy your weekend!

"Because the state of a snapshot may be large, it is stored in a configurable 
state backend 
<https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/state_backends/>.
 By default, this is the JobManager’s memory, but for production use a 
distributed reliable storage should be configured (such as HDFS)” 

Question regarding Flink-CDC for postgres

2023-03-03 Thread Leon Xu
Hi Flink Users,

We are looking to use Flink Postgres CDC but then we noticed it only
supports single thread reading during the snapshot reading phase.
Is there any plan to support parallel snapshot reading for postgres in the
near future? Looks like this is not an issue for mysql.


Thanks
Leon


Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Andrew Otto
Thank you!

On Mon, Feb 13, 2023 at 5:55 AM Dian Fu  wrote:

> Thanks Andrew, I think this is a valid advice. I will update the
> documentation~
>
> Regards,
> Dian
>
> ,
>
> On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto  wrote:
>
>> Question about side outputs and OutputTags in pyflink.  The docs
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
>> say we are supposed to
>>
>> yield output_tag, value
>>
>> Docs then say:
>> > For retrieving the side output stream you use getSideOutput(OutputTag) on
>> the result of the DataStream operation.
>>
>> From this, I'd expect that calling datastream.get_side_output would be
>> optional.   However, it seems that if you do not call
>> datastream.get_side_output, then the main datastream will have the record
>> destined to the output tag still in it, as a Tuple(output_tag, value).
>> This caused me great confusion for a while, as my downstream tasks would
>> break because of the unexpected Tuple type of the record.
>>
>> Here's an example of the failure using side output and ProcessFunction
>> in the word count example.
>> <https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>
>>
>> I'd expect that just yielding an output_tag would make those records be
>> in a different datastream, but apparently this is not the case unless you
>> call get_side_output.
>>
>> If this is the expected behavior, perhaps the docs should be updated to
>> say so?
>>
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>
>>
>>
>>


Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Dian Fu
Thanks Andrew, I think this is a valid advice. I will update the
documentation~

Regards,
Dian

,

On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto  wrote:

> Question about side outputs and OutputTags in pyflink.  The docs
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
> say we are supposed to
>
> yield output_tag, value
>
> Docs then say:
> > For retrieving the side output stream you use getSideOutput(OutputTag) on
> the result of the DataStream operation.
>
> From this, I'd expect that calling datastream.get_side_output would be
> optional.   However, it seems that if you do not call
> datastream.get_side_output, then the main datastream will have the record
> destined to the output tag still in it, as a Tuple(output_tag, value).
> This caused me great confusion for a while, as my downstream tasks would
> break because of the unexpected Tuple type of the record.
>
> Here's an example of the failure using side output and ProcessFunction in
> the word count example.
> <https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>
>
> I'd expect that just yielding an output_tag would make those records be in
> a different datastream, but apparently this is not the case unless you call
> get_side_output.
>
> If this is the expected behavior, perhaps the docs should be updated to
> say so?
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>


Pyflink Side Output Question and/or suggested documentation change

2023-02-10 Thread Andrew Otto
Question about side outputs and OutputTags in pyflink.  The docs
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
say we are supposed to

yield output_tag, value

Docs then say:
> For retrieving the side output stream you use getSideOutput(OutputTag) on
the result of the DataStream operation.

>From this, I'd expect that calling datastream.get_side_output would be
optional.   However, it seems that if you do not call
datastream.get_side_output, then the main datastream will have the record
destined to the output tag still in it, as a Tuple(output_tag, value).
This caused me great confusion for a while, as my downstream tasks would
break because of the unexpected Tuple type of the record.

Here's an example of the failure using side output and ProcessFunction in
the word count example.
<https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>

I'd expect that just yielding an output_tag would make those records be in
a different datastream, but apparently this is not the case unless you call
get_side_output.

If this is the expected behavior, perhaps the docs should be updated to say
so?

-Andrew Otto
 Wikimedia Foundation


Question regarding java.lang.NoSuchMethodError in flink-s3-fs-hadoop.jar

2023-01-29 Thread Leon Xu
Hi Flink Users,

I am getting this exception after upgrading flink-s3-fs-hadoop from 1.15.3
to 1.16.0.

*java.lang.NoSuchMethodError:
'com.google.protobuf.Descriptors$FileDescriptor
org.apache.hadoop.security.proto.SecurityProtos.getDescriptor()'*

After looking into the content of the flink-s3-fs-hadoop:1.16.0 jar. It
looks like in the org.apache.hadoop.security.proto.SecurityProtos class,
the method signature is:
*public static
org.apache.hadoop.thirdparty.protobuf.Descriptors$FileDescriptor
getDescriptor();*
while in flink-s3-fs-hadoop:1.15.3 jar, I can see the correct method
signature in SecurityProtos.class below:
*public static com.google.protobuf.Descriptors$FileDescriptor
getDescriptor();*

I guess that's why I see the exception in 1.16.0 but not in 1.15.3. I am
wondering if there's any workaround/solution for that exception in 1.16.0?


Thanks
Leon


Re: Flink Forward Session Question

2023-01-02 Thread Márton Balassi
Hi Rion,

Unlike the previous Flink Forwards to the best of my knowledge the latest
edition was not uploaded to YouTube. It might make sense to reach out to
the authors directly.

On Sat, Dec 31, 2022 at 5:35 PM Rion Williams  wrote:

> Hey Flinkers,
>
> Firstly, early Happy New Year’s to everyone in the community. I’ve been
> digging a bit into exactly-once processing with Flink and Pinot and I came
> across this session from Flink Foward last year:
>
> -
> https://www.slideshare.net/FlinkForward/exactlyonce-financial-data-processing-at-scale-with-flink-and-pinot
>
> I was curious if anyone knew if this session was recording as the deck
> itself seemed to have quite a bit of value. I figured the mailing list
> would be a reasonable place to ask.
>
> Thanks in advance,
>
> Rion
>


Flink Forward Session Question

2022-12-31 Thread Rion Williams
Hey Flinkers,

Firstly, early Happy New Year’s to everyone in the community. I’ve been digging 
a bit into exactly-once processing with Flink and Pinot and I came across this 
session from Flink Foward last year:

- 
https://www.slideshare.net/FlinkForward/exactlyonce-financial-data-processing-at-scale-with-flink-and-pinot

I was curious if anyone knew if this session was recording as the deck itself 
seemed to have quite a bit of value. I figured the mailing list would be a 
reasonable place to ask.

Thanks in advance,

Rion

Re: Question about match_recognize clause in Flink

2022-12-22 Thread Martijn Visser
Hi Marjan,

That's rather weird, because PyFlink uses the same implementation. Could
you file a Jira ticket? If not, let me know and I'll create one for you.

Best regards,

Martijn

On Thu, Dec 22, 2022 at 11:37 AM Marjan Jordanovski 
wrote:

> Hello,
>
> I am using custom made connector to create Source table in this way:
>
> create table Source (
> ts TIMESTAMP(3),
> instance STRING,
> sservice STRING,
> logdatetime STRING,
> threadid STRING,
> level STRING,
> log_line STRING
> ) with (
> 'connector'='lokiquery',
> 'host'='',
> 'lokiqueryparamsstring'='query={instance="test",
> service="test"}=5000=2022-12-15T16:40:09.560Z=2022-12-15T16:58:09.570Z'
> );
>
> In this table I successfully store data from the specified time range from
> loki. Data is coming as a batch. (not stream)
>
> Then I want to create another table that will look for patterns in the
> log_line column from the Source table. I am doing following:
>
> SELECT *
> FROM Source
> MATCH_RECOGNIZE (
> ORDER BY ts
> MEASURES
> START_ROW.ts AS start_ts,
> END_ROW.ts AS end_ts
> ONE ROW PER MATCH
> AFTER MATCH SKIP TO LAST END_ROW
> PATTERN (START_ROW{1} UNK_ROW+? MID_ROW{2} END_ROW{1})
> DEFINE
> START_ROW AS START_ROW.log_line SIMILAR TO
> '%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%',
> MID_ROW AS MID_ROW.log_line SIMILAR TO '%DSResponse -
> DSResponse: List with%',
> END_ROW AS END_ROW.log_line SIMILAR TO '%ContentRepository%'
> ) MR;
>
> And when using python's pyflink, this works just fine!
> But when I try the same thing in flink sql cli, I get strange error after
> executing second table:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties: convention=LOGICAL,
> FlinkRelDistributionTraitDef=any, sort=[].
> Missing conversion is LogicalMatch[convention: NONE -> LOGICAL]
> There is 1 empty subset: rel#175:RelSubset#1.LOGICAL.any.[], the relevant
> part of the original plan is as follows
> 167:LogicalMatch(partition=[[]], order=[[0 ASC-nulls-first]],
> outputFields=[[start_ts, end_ts]], allRows=[false], after=[SKIP TO
> LAST(_UTF-16LE'END_ROW')],
> pattern=[(((PATTERN_QUANTIFIER(_UTF-16LE'START_ROW', 1, 1, false),
> PATTERN_QUANTIFIER(_UTF-16LE'UNK_ROW', 1, -1, true)),
> PATTERN_QUANTIFIER(_UTF-16LE'MID_ROW', 2, 2, false)),
> PATTERN_QUANTIFIER(_UTF-16LE'END_ROW', 1, 1, false))],
> isStrictStarts=[false], isStrictEnds=[false], subsets=[[]],
> patternDefinitions=[[SIMILAR TO(PREV(START_ROW.$6, 0),
> _UTF-16LE'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%'),
> SIMILAR TO(PREV(MID_ROW.$6, 0), _UTF-16LE'%DSResponse - DSResponse: List
> with%'), SIMILAR TO(PREV(END_ROW.$6, 0), _UTF-16LE'%ContentRepository%')]],
> inputFields=[[ts, instance, service, logdatetime, threadid, level,
> log_line]])
>   1:LogicalTableScan(subset=[rel#166:RelSubset#0.NONE.any.[]],
> table=[[default_catalog, default_database, Source]])
>
> In python, where this works, these are only configs that I use for table
> environment (of course I also include jar for my custom connector) :
> env_settings = EnvironmentSettings.in_batch_mode()
> t_env = TableEnvironment.create(env_settings)
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
>
> Therefore I set these values in flink sql table:
> SET 'execution.runtime-mode' = 'batch';
> SET 'parallelism.default' = '1';
>
> But it didn't help. Does anyone have any idea what could be causing this
> issue?
>
> Thank you,
> Marjan
>


Question about match_recognize clause in Flink

2022-12-22 Thread Marjan Jordanovski
Hello,

I am using custom made connector to create Source table in this way:

create table Source (
ts TIMESTAMP(3),
instance STRING,
sservice STRING,
logdatetime STRING,
threadid STRING,
level STRING,
log_line STRING
) with (
'connector'='lokiquery',
'host'='',
'lokiqueryparamsstring'='query={instance="test",
service="test"}=5000=2022-12-15T16:40:09.560Z=2022-12-15T16:58:09.570Z'
);

In this table I successfully store data from the specified time range from
loki. Data is coming as a batch. (not stream)

Then I want to create another table that will look for patterns in the
log_line column from the Source table. I am doing following:

SELECT *
FROM Source
MATCH_RECOGNIZE (
ORDER BY ts
MEASURES
START_ROW.ts AS start_ts,
END_ROW.ts AS end_ts
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST END_ROW
PATTERN (START_ROW{1} UNK_ROW+? MID_ROW{2} END_ROW{1})
DEFINE
START_ROW AS START_ROW.log_line SIMILAR TO
'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%',
MID_ROW AS MID_ROW.log_line SIMILAR TO '%DSResponse -
DSResponse: List with%',
END_ROW AS END_ROW.log_line SIMILAR TO '%ContentRepository%'
) MR;

And when using python's pyflink, this works just fine!
But when I try the same thing in flink sql cli, I get strange error after
executing second table:

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
enough rules to produce a node with desired properties: convention=LOGICAL,
FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is LogicalMatch[convention: NONE -> LOGICAL]
There is 1 empty subset: rel#175:RelSubset#1.LOGICAL.any.[], the relevant
part of the original plan is as follows
167:LogicalMatch(partition=[[]], order=[[0 ASC-nulls-first]],
outputFields=[[start_ts, end_ts]], allRows=[false], after=[SKIP TO
LAST(_UTF-16LE'END_ROW')],
pattern=[(((PATTERN_QUANTIFIER(_UTF-16LE'START_ROW', 1, 1, false),
PATTERN_QUANTIFIER(_UTF-16LE'UNK_ROW', 1, -1, true)),
PATTERN_QUANTIFIER(_UTF-16LE'MID_ROW', 2, 2, false)),
PATTERN_QUANTIFIER(_UTF-16LE'END_ROW', 1, 1, false))],
isStrictStarts=[false], isStrictEnds=[false], subsets=[[]],
patternDefinitions=[[SIMILAR TO(PREV(START_ROW.$6, 0),
_UTF-16LE'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%'),
SIMILAR TO(PREV(MID_ROW.$6, 0), _UTF-16LE'%DSResponse - DSResponse: List
with%'), SIMILAR TO(PREV(END_ROW.$6, 0), _UTF-16LE'%ContentRepository%')]],
inputFields=[[ts, instance, service, logdatetime, threadid, level,
log_line]])
  1:LogicalTableScan(subset=[rel#166:RelSubset#0.NONE.any.[]],
table=[[default_catalog, default_database, Source]])

In python, where this works, these are only configs that I use for table
environment (of course I also include jar for my custom connector) :
env_settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("parallelism.default",
"1")

Therefore I set these values in flink sql table:
SET 'execution.runtime-mode' = 'batch';
SET 'parallelism.default' = '1';

But it didn't help. Does anyone have any idea what could be causing this
issue?

Thank you,
Marjan


Re: question about Async IO

2022-11-04 Thread David Anderson
Yes, that will work as you expect. So long as you don't put another shuffle
or rebalance in between, the keyed partitioning that's already in place
will carry through the async i/o operator, and beyond. In most cases you
can even use reinterpretAsKeyedStream on the output (so long as you haven't
done something to emit records that break the contract it expects).

David

On Wed, Nov 2, 2022 at 4:19 PM Galen Warren  wrote:

> Thanks, that makes sense and matches my understanding of how it works.
>
> In my case, I don't actually need access to keyed *state*; I just want to
> make sure that all elements with the same key are routed to the same
> instance of the async function. (Without going into too much detail, the
> reason for this is that I want to invoke async operations for the same key
> in sequence, i.e. not have two in-flight async operations for the same key
> at the same time. I can accomplish this with a local map of in-flight async
> operations, in the function, so long as all inputs with the same keys get
> routed to the same instance of the function.)
>
> So far as I can tell, if I create a keyed stream and then use it as an
> input to an async function, the keys will be distributed across the async
> function instances the way I want, even if keyed state is inaccessible.
> Anyway, that's what I'm looking to confirm.
>
> On Wed, Nov 2, 2022 at 5:14 AM Filip Karnicki 
> wrote:
>
>> Hi Galen
>>
>> I was thinking about the same thing recently and reached a point where I
>> see that async io does not have access to the keyed state because:
>>
>> "* State related apis in
>> [[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
>>  * yet because the key may get changed while accessing states in the
>> working thread."
>>
>> I don't think that the key can change at any random time here, because of
>>
>> "A common confusion that we want to explicitly point out here is that the
>> AsyncFunction is not called in a multi-threaded fashion. There exists only
>> one instance of the AsyncFunction and it is called sequentially for each
>> record in the respective partition of the stream"
>> From:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>>
>> So if the RichAsyncFunctionRuntimeContext had access to a
>> KeyedStateBackend and since it's basically a facade on top of
>> RuntimeContext. we could (maybe) change the method signature for something
>> like getState to include the key, and run
>> keyedStateBackend.setCurrentKey(key) before continuing with anything else.
>>
>>
>> Anyone - please stop me if I'm talking nonsense
>>
>>
>> On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Hi Galen,
>>> i will tell from my experience as a Flink user and developer of Flink
>>> jobs.
>>>
>>>
>>>
>>> *"if the input to an AsyncFunction is a keyed stream, can I assume that
>>> all input elements with the same key will be handled by the same instance
>>> of the async operator"*
>>> From what I know (and someone can correct me if I'm wrong) this is
>>> possible. However you have to make sure that there is no Re-balance or
>>> re-shuffle between those operators. For example operators after first
>>> .keyBy(..) call must have same parallelism level.
>>>
>>> Regarding:
>>> " I have a situation where I would like to enforce that async operations
>>> associated with a particular key happen sequentially,"
>>>
>>> This is also possible as far as I know. In fact I was implementing
>>> streaming pipeline with similar requirements like
>>> *"maintaining order of events withing keyBy group across multiple
>>> operators including Async operators". *
>>> We achieved that with same thing -> making sure that all operators in
>>> entire pipeline except Source and Sink had exact same parallelism level.
>>> Additional thing to remember here is that if you call .keyBy(...) again
>>> but with different key extractor, then original order might not be
>>> preserved since keyBy will execute re-shuffle/re-balance.
>>>
>>> We were also using reinterpretAsKeyedStream feature [1] after async
>>> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
>>> .keyBy always has negative impact on performance.
>>> With reinterpretAsKeyedStream we were able to use keyed operators with
>>> access to keyed state after A

Re: Question about UDF randomly processed input row twice

2022-11-03 Thread yuxia
Thanks for your explanation. The execute plan for the sql `INSERT INTO 
print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM 
datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL` is : 
` 
StreamPhysicalSink(table=[default_catalog.default_database.print_table], 
fields=[id_in_bytes, id]) 
StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT 
NULL(RandomUdf(id))]) 
StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, 
datagenTable]], fields=[id]) 
` 
and from the plan, we can see it'll call the udf for twice in the 
StreamPhysicalCalc, as of result of which, it seems the one row will be 
processed for twice. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan"  
收件人: "yuxia"  
抄送: "User"  
发送时间: 星期五, 2022年 11 月 04日 上午 5:28:30 
主题: Re: Question about UDF randomly processed input row twice 

Ok. The datagen with sequence option can produce this issue easily, and it also 
resulted in an incorrect result. I have a sequence generated by datagen that 
starts from 1 to 5 and let the UDF randomly either return null or bytes. 
Surprisingly, not only the UDF has been executed twice but also the where 
clause did not handle the ` IS NOT NULL `. This is a big shock from my side, 
the where clause `IS NOT NULL` condition is a fundamental SQL feature and it 
should not break. I have updated my finding in [ 
https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] , and here 
are the repro steps: 

Query: 
INSERT INTO print_table 
 SELECT * FROM ( 
   SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable 
 ) 
AS ET WHERE ET.`id_in_bytes` IS NOT NULL " 

Result: 
+I[ null , 1] 
+I[[50], 2] 
+I[ null , 4] 


UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = (( 
int ) ( Math .random() * (10 - 1))) + 1;
    LOG.info( "[*][*][*] input num is {} and random number is {}. [*][*][*]" , 
intputNum, randomNumber); if (randomNumber % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Log: 
2022-11-02 13:38:56,765 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:56,766 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,763 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:02,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### 

Re: Question about UDF randomly processed input row twice

2022-11-03 Thread Xinyi Yan
 54, 48, 55] and num
-453705607.
2022-11-02 13:39:07,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:07,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:08,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num
-1627597417.
2022-11-02 13:39:09,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num
596520501.
2022-11-02 13:39:10,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num
1361162843.
2022-11-02 13:39:11,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num
2048051791.
2022-11-02 13:39:12,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num
-306603835.


On Thu, Nov 3, 2022 at 3:04 AM yuxia  wrote:

> The dategen may produce rows with same values.
>
> From my side, in Flink, the udf shouldn't process one row for twice,
> otherwise, it should be a critical bug.
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Xinyi Yan" 
> *收件人: *"User" 
> *发送时间: *星期四, 2022年 11 月 03日 上午 6:59:20
> *主题: *Question about UDF randomly processed input row twice
>
> Hi all,
> I found a weird UDF behavior, and it's a single thread that processes UDF
> twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
> more details. Basically, I created a datagen table with a random integer (1
> row per second) and passed this value into the UDF. Inside UDF, I just
> simply mod the input number, convert the integer to a byte array, and then
> logged it for debugging purposes. As you can see, some of the rows have
> been called twice inside UDF. Not sure if this duplicated UDF call is
> expected, and not sure why it doesn't constantly produce duplicated calls
> for all rows. In any case of concern about the local env setup, I only have
> 1 task manager and 1 task slot in my local Flink cluster.
>
> Thanks!
>
> UDF
>
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
> intputNum) {
> byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
> if (intputNum % 2 == 0) {
>   LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
> duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>   return results;
> }
> LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
> return null;
>   }
>
>
> Main class DDLs
>
> tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
> "id  INT\n" +
> ") WITH (\n" +
> "'connector' = 'datagen',\n" +
> "'number-of-rows' = '100',\n" +
> "'rows-per-second' = '1'\n" +
> ")");
> tEnv.executeSql("CREATE TABLE print_table (\n" +
> "id_in_bytes  VARBINARY,\n" +
> "id  INT\n" +
> ") WITH (\n" +
> "'connector' = 'print'\n" +
> ")");
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT 
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
> ET.`id_in_bytes` IS NOT NULL");
>
>
> Logging
>
> 2022-11-02 13:38:58,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:38:58,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:38:59,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** 
> input bytes [45, 49, 56, 48,

Re: Question about UDF randomly processed input row twice

2022-11-03 Thread yuxia
The dategen may produce rows with same values. 

>From my side, in Flink, the udf shouldn't process one row for twice, 
>otherwise, it should be a critical bug. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan"  
收件人: "User"  
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 
主题: Question about UDF randomly processed input row twice 

Hi all, 
I found a weird UDF behavior, and it's a single thread that processes UDF 
twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] 
for more details. Basically, I created a datagen table with a random integer (1 
row per second) and passed this value into the UDF. Inside UDF, I just simply 
mod the input number, convert the integer to a byte array, and then logged it 
for debugging purposes. As you can see, some of the rows have been called twice 
inside UDF. Not sure if this duplicated UDF call is expected, and not sure why 
it doesn't constantly produce duplicated calls for all rows. In any case of 
concern about the local env setup, I only have 1 task manager and 1 task slot 
in my local Flink cluster. 

Thanks! 

UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Main class DDLs 
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 
'org.apache.flink.playgrounds.spendreport.IntInputUdf' " );        
tEnv.executeSql( "CREATE TABLE datagenTable (\n" + "    id  INT\n" + ") WITH 
(\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 
'rows-per-second' = '1' \n" + ")" );        
tEnv.executeSql( "CREATE TABLE print_table (\n" + "    id_in_bytes  
VARBINARY,\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + 
")" );        
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT 
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
ET.`id_in_bytes` IS NOT NULL" ); 

Logging 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 



Question about UDF randomly processed input row twice

2022-11-02 Thread Xinyi Yan
Hi all,
I found a weird UDF behavior, and it's a single thread that processes UDF
twice, see FLINK-29855  for
more details. Basically, I created a datagen table with a random integer (1
row per second) and passed this value into the UDF. Inside UDF, I just
simply mod the input number, convert the integer to a byte array, and then
logged it for debugging purposes. As you can see, some of the rows have
been called twice inside UDF. Not sure if this duplicated UDF call is
expected, and not sure why it doesn't constantly produce duplicated calls
for all rows. In any case of concern about the local env setup, I only have
1 task manager and 1 task slot in my local Flink cluster.

Thanks!

UDF

public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
if (intputNum % 2 == 0) {
  LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ###
### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
  return results;
}
LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
return null;
  }


Main class DDLs

tEnv.executeSql("CREATE FUNCTION IntInputUdf AS
'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
tEnv.executeSql("CREATE TABLE datagenTable (\n" +
"id  INT\n" +
") WITH (\n" +
"'connector' = 'datagen',\n" +
"'number-of-rows' = '100',\n" +
"'rows-per-second' = '1'\n" +
")");
tEnv.executeSql("CREATE TABLE print_table (\n" +
"id_in_bytes  VARBINARY,\n" +
"id  INT\n" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET
WHERE ET.`id_in_bytes` IS NOT NULL");


Logging

2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
*-1800690437*.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
*1428877483*.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###


Re: question about Async IO

2022-11-02 Thread Galen Warren
Thanks, that makes sense and matches my understanding of how it works.

In my case, I don't actually need access to keyed *state*; I just want to
make sure that all elements with the same key are routed to the same
instance of the async function. (Without going into too much detail, the
reason for this is that I want to invoke async operations for the same key
in sequence, i.e. not have two in-flight async operations for the same key
at the same time. I can accomplish this with a local map of in-flight async
operations, in the function, so long as all inputs with the same keys get
routed to the same instance of the function.)

So far as I can tell, if I create a keyed stream and then use it as an
input to an async function, the keys will be distributed across the async
function instances the way I want, even if keyed state is inaccessible.
Anyway, that's what I'm looking to confirm.

On Wed, Nov 2, 2022 at 5:14 AM Filip Karnicki 
wrote:

> Hi Galen
>
> I was thinking about the same thing recently and reached a point where I
> see that async io does not have access to the keyed state because:
>
> "* State related apis in
> [[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
>  * yet because the key may get changed while accessing states in the
> working thread."
>
> I don't think that the key can change at any random time here, because of
>
> "A common confusion that we want to explicitly point out here is that the
> AsyncFunction is not called in a multi-threaded fashion. There exists only
> one instance of the AsyncFunction and it is called sequentially for each
> record in the respective partition of the stream"
> From:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>
> So if the RichAsyncFunctionRuntimeContext had access to a
> KeyedStateBackend and since it's basically a facade on top of
> RuntimeContext. we could (maybe) change the method signature for something
> like getState to include the key, and run
> keyedStateBackend.setCurrentKey(key) before continuing with anything else.
>
>
> Anyone - please stop me if I'm talking nonsense
>
>
> On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi Galen,
>> i will tell from my experience as a Flink user and developer of Flink
>> jobs.
>>
>>
>>
>> *"if the input to an AsyncFunction is a keyed stream, can I assume that
>> all input elements with the same key will be handled by the same instance
>> of the async operator"*
>> From what I know (and someone can correct me if I'm wrong) this is
>> possible. However you have to make sure that there is no Re-balance or
>> re-shuffle between those operators. For example operators after first
>> .keyBy(..) call must have same parallelism level.
>>
>> Regarding:
>> " I have a situation where I would like to enforce that async operations
>> associated with a particular key happen sequentially,"
>>
>> This is also possible as far as I know. In fact I was implementing
>> streaming pipeline with similar requirements like
>> *"maintaining order of events withing keyBy group across multiple
>> operators including Async operators". *
>> We achieved that with same thing -> making sure that all operators in
>> entire pipeline except Source and Sink had exact same parallelism level.
>> Additional thing to remember here is that if you call .keyBy(...) again
>> but with different key extractor, then original order might not be
>> preserved since keyBy will execute re-shuffle/re-balance.
>>
>> We were also using reinterpretAsKeyedStream feature [1] after async
>> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
>> .keyBy always has negative impact on performance.
>> With reinterpretAsKeyedStream we were able to use keyed operators with
>> access to keyed state after Async operators.
>>
>> Hope that helped.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>>
>>
>>
>>
>>
>>
>> pt., 14 paź 2022 o 19:11 Galen Warren 
>> napisał(a):
>>
>>> I have a question about Flink's Async IO support: Async I/O | Apache
>>> Flink
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
>>> .
>>>
>>> I understand that access to state is not supported in an AsyncFunction.
>>> However, if the input to an AsyncFunction is a keyed strea

Re: question about Async IO

2022-11-02 Thread Filip Karnicki
Hi Galen

I was thinking about the same thing recently and reached a point where I
see that async io does not have access to the keyed state because:

"* State related apis in
[[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
 * yet because the key may get changed while accessing states in the
working thread."

I don't think that the key can change at any random time here, because of

"A common confusion that we want to explicitly point out here is that the
AsyncFunction is not called in a multi-threaded fashion. There exists only
one instance of the AsyncFunction and it is called sequentially for each
record in the respective partition of the stream"
From:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

So if the RichAsyncFunctionRuntimeContext had access to a
KeyedStateBackend and since it's basically a facade on top of
RuntimeContext. we could (maybe) change the method signature for something
like getState to include the key, and run
keyedStateBackend.setCurrentKey(key) before continuing with anything else.


Anyone - please stop me if I'm talking nonsense


On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi Galen,
> i will tell from my experience as a Flink user and developer of Flink jobs.
>
>
>
> *"if the input to an AsyncFunction is a keyed stream, can I assume that
> all input elements with the same key will be handled by the same instance
> of the async operator"*
> From what I know (and someone can correct me if I'm wrong) this is
> possible. However you have to make sure that there is no Re-balance or
> re-shuffle between those operators. For example operators after first
> .keyBy(..) call must have same parallelism level.
>
> Regarding:
> " I have a situation where I would like to enforce that async operations
> associated with a particular key happen sequentially,"
>
> This is also possible as far as I know. In fact I was implementing
> streaming pipeline with similar requirements like
> *"maintaining order of events withing keyBy group across multiple
> operators including Async operators". *
> We achieved that with same thing -> making sure that all operators in
> entire pipeline except Source and Sink had exact same parallelism level.
> Additional thing to remember here is that if you call .keyBy(...) again
> but with different key extractor, then original order might not be
> preserved since keyBy will execute re-shuffle/re-balance.
>
> We were also using reinterpretAsKeyedStream feature [1] after async
> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
> .keyBy always has negative impact on performance.
> With reinterpretAsKeyedStream we were able to use keyed operators with
> access to keyed state after Async operators.
>
> Hope that helped.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>
> Regards,
> Krzysztof Chmielewski
>
>
>
>
>
>
>
> pt., 14 paź 2022 o 19:11 Galen Warren 
> napisał(a):
>
>> I have a question about Flink's Async IO support: Async I/O | Apache
>> Flink
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
>> .
>>
>> I understand that access to state is not supported in an AsyncFunction.
>> However, if the input to an AsyncFunction is a keyed stream, can I assume
>> that all input elements with the same key will be handled by the same
>> instance of the async operator, as would normally be the case with keyed
>> streams/operators?
>>
>> I'm asking because I have a situation where I would like to enforce that
>> async operations associated with a particular key happen sequentially, i.e.
>> if two elements come through with the same key, I need  the async operation
>> for the second to happen after the async operation for the first one
>> completes. I think I can achieve this using a local map of "in flight"
>> async operations in the operator itself, but only if I can rely on all
>> input elements with the same key being processed by the same async operator.
>>
>> If anyone can confirm how this works, I'd appreciate it. Thanks.
>>
>


Re: question about Async IO

2022-10-14 Thread Krzysztof Chmielewski
Hi Galen,
i will tell from my experience as a Flink user and developer of Flink jobs.



*"if the input to an AsyncFunction is a keyed stream, can I assume that all
input elements with the same key will be handled by the same instance of
the async operator"*
>From what I know (and someone can correct me if I'm wrong) this is
possible. However you have to make sure that there is no Re-balance or
re-shuffle between those operators. For example operators after first
.keyBy(..) call must have same parallelism level.

Regarding:
" I have a situation where I would like to enforce that async operations
associated with a particular key happen sequentially,"

This is also possible as far as I know. In fact I was implementing
streaming pipeline with similar requirements like
*"maintaining order of events withing keyBy group across multiple operators
including Async operators". *
We achieved that with same thing -> making sure that all operators in
entire pipeline except Source and Sink had exact same parallelism level.
Additional thing to remember here is that if you call .keyBy(...) again but
with different key extractor, then original order might not be preserved
since keyBy will execute re-shuffle/re-balance.

We were also using reinterpretAsKeyedStream feature [1] after async
operators to avoid calling ".keyBay" multiple times in pipeline. Calling
.keyBy always has negative impact on performance.
With reinterpretAsKeyedStream we were able to use keyed operators with
access to keyed state after Async operators.

Hope that helped.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/

Regards,
Krzysztof Chmielewski







pt., 14 paź 2022 o 19:11 Galen Warren  napisał(a):

> I have a question about Flink's Async IO support: Async I/O | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
> .
>
> I understand that access to state is not supported in an AsyncFunction.
> However, if the input to an AsyncFunction is a keyed stream, can I assume
> that all input elements with the same key will be handled by the same
> instance of the async operator, as would normally be the case with keyed
> streams/operators?
>
> I'm asking because I have a situation where I would like to enforce that
> async operations associated with a particular key happen sequentially, i.e.
> if two elements come through with the same key, I need  the async operation
> for the second to happen after the async operation for the first one
> completes. I think I can achieve this using a local map of "in flight"
> async operations in the operator itself, but only if I can rely on all
> input elements with the same key being processed by the same async operator.
>
> If anyone can confirm how this works, I'd appreciate it. Thanks.
>


question about Async IO

2022-10-14 Thread Galen Warren
I have a question about Flink's Async IO support: Async I/O | Apache Flink
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
.

I understand that access to state is not supported in an AsyncFunction.
However, if the input to an AsyncFunction is a keyed stream, can I assume
that all input elements with the same key will be handled by the same
instance of the async operator, as would normally be the case with keyed
streams/operators?

I'm asking because I have a situation where I would like to enforce that
async operations associated with a particular key happen sequentially, i.e.
if two elements come through with the same key, I need  the async operation
for the second to happen after the async operation for the first one
completes. I think I can achieve this using a local map of "in flight"
async operations in the operator itself, but only if I can rely on all
input elements with the same key being processed by the same async operator.

If anyone can confirm how this works, I'd appreciate it. Thanks.


Re: Question about SQL gateway

2022-10-12 Thread Ww J
Thanks Xuyang.

Jack

> On Oct 12, 2022, at 8:46 AM, Xuyang  wrote:
> 
> Hi, currently I think there is no ha about gateway. When the gateway crashes, 
> the job about being submitted sync will be cancelled, and the async job will 
> continue running. When the gateway restarts, the async job could be found by 
> gateway. BTW, the work about ha is in continuous progress.
> At 2022-10-11 13:35:50, "Ww J"  wrote:
>> Hi,
>> 
>> I submit a stream job from the SQL gateway. The stream job keeps outputting 
>> results to the SQL gateway. If the SQL gateway restarts or crashes, the 
>> stream job will continue running. After the SQL gateway restarts, how to get 
>> the results of the steam job?
>> 
>> Thanks.
>> 
>> Jack



Question about SQL gateway

2022-10-10 Thread Ww J
Hi,

I submit a stream job from the SQL gateway. The stream job keeps outputting 
results to the SQL gateway. If the SQL gateway restarts or crashes, the stream 
job will continue running. After the SQL gateway restarts, how to get the 
results of the steam job?

Thanks.

Jack

Question about Flink Broadcast State event ordering

2022-10-04 Thread Qing Lim
Hi Flink user group,

I have a question around broadcast.

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?
The broadcasted element should arrive in the same order across all tasks, right?

For example, given a broadcasted stream A, and a non-broadcasted stream B

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn't sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

Kind regards.

This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. (“MWNA”), which is registered with the US 
Securities and Exchange Commission (“SEC”) as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.


Re: Question regarding to debezium format

2022-09-29 Thread Ali Bahadir Zeybek
Hello Edwin,

Would you mind sharing a simple FlinkSQL DDL for the table you are creating
with the kafka connector and dthe debezium-avro-confluent format?

Also, can you elaborate on the mechanism who publishes initially to the
schema registry and share the corresponding schema?

In a nutshell, the error messages usually indicate that where the field is
nullable, it is not defined as nullable in the avro schema.

Sincerely,

Ali

On Thu, Sep 29, 2022 at 12:27 PM Martijn Visser 
wrote:

> Hi Edwin,
>
> I'm suspecting that's because those fields are considered metadata which
> are treated separately. There's
> https://issues.apache.org/jira/browse/FLINK-20454 for adding the metadata
> support for the Debezium format with a PR provided, but not yet reviewed.
> If you could have a look at the PR and if it would work, we can see if we
> can get that merged in.
>
> Best regards,
>
> Martijn
>
> On Wed, Sep 28, 2022 at 9:35 AM Edwin  wrote:
>
>> Hi guys,
>>
>> I was trying to use flink sql to consume data from kafka source, the
>> format of which is debezium-avro-confluent. And I encountered a
>> AvroTypeException saying that "Found something, expecting union", where
>> something is not a type but a field that I defined in the schema registery.
>> So I looked into the source code and what I found
>> in 
>> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.createDebeziumAvroRowType
>> was a segement of comment saying that "Debezium Avro contains other
>> information, e.g. "source", "ts_ms", but we don't need them". I am
>> wondering why don't we need it? Both source and ts_ms are in my schema, and
>> I assume that the absence of source and ts_ms caused the exception
>> described above.
>> I am using Flink 1.15. Any help would be highly appreciated! Thanks!
>>
>


Re: Question regarding to debezium format

2022-09-29 Thread Martijn Visser
Hi Edwin,

I'm suspecting that's because those fields are considered metadata which
are treated separately. There's
https://issues.apache.org/jira/browse/FLINK-20454 for adding the metadata
support for the Debezium format with a PR provided, but not yet reviewed.
If you could have a look at the PR and if it would work, we can see if we
can get that merged in.

Best regards,

Martijn

On Wed, Sep 28, 2022 at 9:35 AM Edwin  wrote:

> Hi guys,
>
> I was trying to use flink sql to consume data from kafka source, the
> format of which is debezium-avro-confluent. And I encountered a
> AvroTypeException saying that "Found something, expecting union", where
> something is not a type but a field that I defined in the schema registery.
> So I looked into the source code and what I found
> in 
> org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.createDebeziumAvroRowType
> was a segement of comment saying that "Debezium Avro contains other
> information, e.g. "source", "ts_ms", but we don't need them". I am
> wondering why don't we need it? Both source and ts_ms are in my schema, and
> I assume that the absence of source and ts_ms caused the exception
> described above.
> I am using Flink 1.15. Any help would be highly appreciated! Thanks!
>


Question regarding to debezium format

2022-09-28 Thread Edwin
Hi guys,


I was trying to use flink sql to consume data from kafka source, the format of 
which is debezium-avro-confluent. And I encountered a AvroTypeException saying 
that "Found something, expecting union", where something is not a type but a 
field that I defined in the schema registery. 
So I looked into the source code and what I found in 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.createDebeziumAvroRowType
 was a segement of comment saying that "Debezium Avro contains other 
information, e.g. "source", "ts_ms", but we don't need them". I am wondering 
why don't we need it? Both source and ts_ms are in my schema, and I assume that 
the absence of source and ts_ms caused the exception described above. 
I am using Flink 1.15. Any help would be highly appreciated! Thanks!

Re: A question about restoring state with an additional variable with kryo

2022-09-20 Thread Vishal Santoshi
Thanks, I'll check it out.

On Sun, Sep 18, 2022 at 7:33 PM David Anderson  wrote:

> Vishal,
>
> If you decide you can't live with dropping that state, [1] is a complete
> example showing how to migrate from Kryo by using the state processor API.
>
> David
>
> [1]
> https://www.docs.immerok.cloud/docs/cookbook/migrating-state-away-from-kryo/
>
>
> On Fri, Sep 16, 2022 at 8:32 AM Vishal Santoshi 
> wrote:
>
>> Thank you for the clarification. I thought so to,
>>
>> Unfortunately my state are generics based and those are definitely not
>> treated as a POJO , though it has all the constructs ( no arg constructor,
>> getters/setters etc ). I will likely take an at least once hit by
>>
>> Changing the uid of that specific Operator, and restart with Allow
>> non-restored state ... This will ignore state that cannot be restored (
>> for the previous uid ) , construct state for the new uid  and not affect
>> other operators ( including the kafka consumer operators ). I can live with
>> it, I think.
>>
>> On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Hi Vishal,
>>>
>>>
>>>
>>> Good news and bad news :
>>>
>>>
>>>
>>>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>>>- Good: not all is lost here,
>>>   - If you happen to have state that you cannot afford to lose, you
>>>   can transcode it by means of the savepoint API [2],
>>>   - However, this takes quite some effort
>>>- In general, if you ever plan to migrate/extend your schemas,
>>>choose a data type that supports schema migration [1],
>>>- In your case, PoJo types would be the closest to your original
>>>implementation
>>>- You can disable Kryo in configuration to avoid this situation in
>>>the future, by the way,
>>>- Kryo serializer is quite slow compared to the other options and I
>>>believe it is only there as a (emergency) fallback solution: [3]
>>>
>>>
>>>
>>> Feel free to ask for clarification 
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>>>
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>>
>>> [3]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Vishal Santoshi 
>>> *Sent:* Friday, September 16, 2022 1:17 AM
>>> *To:* user 
>>> *Subject:* Re: A question about restoring state with an additional
>>> variable with kryo
>>>
>>>
>>>
>>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>>
>>>
>>>
>>> The exception thrown is as follows. I realize that it is trying to read
>>> the long value. How do I signal to kryo that it is OK and that he object
>>> can have a default value
>>>
>>>
>>>
>>> Caused by: java.io.EOFException: No more bytes left.
>>>
>>> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
>>> .require(NoFetchingInput.java:80)
>>>
>>> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>>
>>> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>>
>>> at com.esotericsoftware.kryo.serializers.
>>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>>>
>>> at com.esotericsoftware.kryo.serializers.
>>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>>>
>>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>>>
>>> at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>> ObjectField.java:113)
>>>
>>> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>> FieldSerializer.java:528)
>>>
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>>> .deserialize(KryoSerializer.java:354)
>&g

Re: A question about restoring state with an additional variable with kryo

2022-09-18 Thread David Anderson
Vishal,

If you decide you can't live with dropping that state, [1] is a complete
example showing how to migrate from Kryo by using the state processor API.

David

[1]
https://www.docs.immerok.cloud/docs/cookbook/migrating-state-away-from-kryo/


On Fri, Sep 16, 2022 at 8:32 AM Vishal Santoshi 
wrote:

> Thank you for the clarification. I thought so to,
>
> Unfortunately my state are generics based and those are definitely not
> treated as a POJO , though it has all the constructs ( no arg constructor,
> getters/setters etc ). I will likely take an at least once hit by
>
> Changing the uid of that specific Operator, and restart with Allow
> non-restored state ... This will ignore state that cannot be restored (
> for the previous uid ) , construct state for the new uid  and not affect
> other operators ( including the kafka consumer operators ). I can live with
> it, I think.
>
> On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Hi Vishal,
>>
>>
>>
>> Good news and bad news :
>>
>>
>>
>>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>>- Good: not all is lost here,
>>   - If you happen to have state that you cannot afford to lose, you
>>   can transcode it by means of the savepoint API [2],
>>   - However, this takes quite some effort
>>- In general, if you ever plan to migrate/extend your schemas, choose
>>a data type that supports schema migration [1],
>>- In your case, PoJo types would be the closest to your original
>>implementation
>>- You can disable Kryo in configuration to avoid this situation in
>>the future, by the way,
>>- Kryo serializer is quite slow compared to the other options and I
>>believe it is only there as a (emergency) fallback solution: [3]
>>
>>
>>
>> Feel free to ask for clarification 
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>
>> [3]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>
>>
>>
>>
>>
>>
>>
>> *From:* Vishal Santoshi 
>> *Sent:* Friday, September 16, 2022 1:17 AM
>> *To:* user 
>> *Subject:* Re: A question about restoring state with an additional
>> variable with kryo
>>
>>
>>
>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>
>>
>>
>> The exception thrown is as follows. I realize that it is trying to read
>> the long value. How do I signal to kryo that it is OK and that he object
>> can have a default value
>>
>>
>>
>> Caused by: java.io.EOFException: No more bytes left.
>>
>> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
>> .require(NoFetchingInput.java:80)
>>
>> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>
>> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>
>> at com.esotericsoftware.kryo.serializers.
>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>>
>> at com.esotericsoftware.kryo.serializers.
>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>>
>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>>
>> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:113)
>>
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>> .deserialize(KryoSerializer.java:354)
>>
>> at org.apache.flink.api.common.typeutils.CompositeSerializer
>> .deserialize(CompositeSerializer.java:156)
>>
>> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
>> RocksDBValueState.java:89)
>>
>>
>>
>> On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>> << How do I make sure that when reconstituting the state, kryo does not
>> complain? It tries to map the previous state to the new definition of Class
>> A and complains that it c

Re: A question about restoring state with an additional variable with kryo

2022-09-16 Thread Vishal Santoshi
Thank you for the clarification. I thought so to,

Unfortunately my state are generics based and those are definitely not
treated as a POJO , though it has all the constructs ( no arg constructor,
getters/setters etc ). I will likely take an at least once hit by

Changing the uid of that specific Operator, and restart with Allow
non-restored state ... This will ignore state that cannot be restored ( for
the previous uid ) , construct state for the new uid  and not affect other
operators ( including the kafka consumer operators ). I can live with it, I
think.

On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Vishal,
>
>
>
> Good news and bad news :
>
>
>
>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>- Good: not all is lost here,
>   - If you happen to have state that you cannot afford to lose, you
>   can transcode it by means of the savepoint API [2],
>   - However, this takes quite some effort
>- In general, if you ever plan to migrate/extend your schemas, choose
>a data type that supports schema migration [1],
>- In your case, PoJo types would be the closest to your original
>implementation
>- You can disable Kryo in configuration to avoid this situation in the
>future, by the way,
>- Kryo serializer is quite slow compared to the other options and I
>believe it is only there as a (emergency) fallback solution: [3]
>
>
>
> Feel free to ask for clarification 
>
>
>
> Thias
>
>
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> [3]
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>
>
>
>
>
>
>
> *From:* Vishal Santoshi 
> *Sent:* Friday, September 16, 2022 1:17 AM
> *To:* user 
> *Subject:* Re: A question about restoring state with an additional
> variable with kryo
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> The exception thrown is as follows. I realize that it is trying to read
> the long value. How do I signal to kryo that it is OK and that he object
> can have a default value
>
>
>
> Caused by: java.io.EOFException: No more bytes left.
>
> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> .require(NoFetchingInput.java:80)
>
> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>
> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>
> at com.esotericsoftware.kryo.serializers.
> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>
> at com.esotericsoftware.kryo.serializers.
> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
>
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:354)
>
> at org.apache.flink.api.common.typeutils.CompositeSerializer
> .deserialize(CompositeSerializer.java:156)
>
> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
> RocksDBValueState.java:89)
>
>
>
> On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi 
> wrote:
>
> << How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
>
>
> >> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `long b`.
>
>
>
> Sorry a typo
>
>
>
>
>
> On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
> wrote:
>
> I have state in rocksDB that represents say
>
>
>
> class A {
>
>   String a
>
> }
>
>
>
> I now change my class and add another variable
>
>
>
>
> Class A {
>
>   String a;
>
>   long b = 0;
>
> }
>
>
>
> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Clas

RE: A question about restoring state with an additional variable with kryo

2022-09-16 Thread Schwalbe Matthias
Hi Vishal,

Good news and bad news :


  *   Bad: Kryo serializer cannot be used for schema evolution, see [1]
  *   Good: not all is lost here,
 *   If you happen to have state that you cannot afford to lose, you can 
transcode it by means of the savepoint API [2],
 *   However, this takes quite some effort
  *   In general, if you ever plan to migrate/extend your schemas, choose a 
data type that supports schema migration [1],
  *   In your case, PoJo types would be the closest to your original 
implementation
  *   You can disable Kryo in configuration to avoid this situation in the 
future, by the way,
  *   Kryo serializer is quite slow compared to the other options and I believe 
it is only there as a (emergency) fallback solution: [3]

Feel free to ask for clarification 

Thias



[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
[3] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html



From: Vishal Santoshi 
Sent: Friday, September 16, 2022 1:17 AM
To: user 
Subject: Re: A question about restoring state with an additional variable with 
kryo

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


The exception thrown is as follows. I realize that it is trying to read the 
long value. How do I signal to kryo that it is OK and that he object can have a 
default value

Caused by: java.io<http://java.io>.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:80)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarLong(Input.java:690)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readLong(Input.java:685)
at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
at 
org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)

On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
<< How do I make sure that when reconstituting the state, kryo does not 
complain? It tries to map the previous state to the new definition of Class A 
and complains that it cannot read the value for `String b`.

>> How do I make sure that when reconstituting the state, kryo does not 
>> complain? It tries to map the previous state to the new definition of Class 
>> A and complains that it cannot read the value for `long b`.

Sorry a typo


On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:
I have state in rocksDB that represents say

class A {
  String a
}

I now change my class and add another variable


Class A {
  String a;
  long b = 0;
}

How do I make sure that when reconstituting the state, kryo does not complain? 
It tries to map the previous state to the new definition of Class A and 
complains that it cannot read the value for `String b`.

Unfortunately the state is not using POJO serializer.

Thanks and Regards.

Vishal




Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: A question about restoring state with an additional variable with kryo

2022-09-15 Thread Vishal Santoshi
The exception thrown is as follows. I realize that it is trying to read the
long value. How do I signal to kryo that it is OK and that he object can
have a default value

Caused by: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
NoFetchingInput.java:80)
at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
at com.esotericsoftware.kryo.serializers.
DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
at com.esotericsoftware.kryo.serializers.
DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:113)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:354)
at org.apache.flink.api.common.typeutils.CompositeSerializer
.deserialize(CompositeSerializer.java:156)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
RocksDBValueState.java:89)

On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi 
wrote:

> << How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
> >> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `long b`.
>
> Sorry a typo
>
>
> On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
> wrote:
>
>> I have state in rocksDB that represents say
>>
>> class A {
>>   String a
>> }
>>
>> I now change my class and add another variable
>>
>>
>> Class A {
>>   String a;
>>   long b = 0;
>> }
>>
>> How do I make sure that when reconstituting the state, kryo does not
>> complain? It tries to map the previous state to the new definition of Class
>> A and complains that it cannot read the value for `String b`.
>>
>> Unfortunately the state is not using POJO serializer.
>>
>> Thanks and Regards.
>>
>> Vishal
>>
>>
>>
>>
>>


Re: A question about restoring state with an additional variable with kryo

2022-09-15 Thread Vishal Santoshi
<< How do I make sure that when reconstituting the state, kryo does not
complain? It tries to map the previous state to the new definition of Class
A and complains that it cannot read the value for `String b`.

>> How do I make sure that when reconstituting the state, kryo does not
complain? It tries to map the previous state to the new definition of Class
A and complains that it cannot read the value for `long b`.

Sorry a typo


On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi 
wrote:

> I have state in rocksDB that represents say
>
> class A {
>   String a
> }
>
> I now change my class and add another variable
>
>
> Class A {
>   String a;
>   long b = 0;
> }
>
> How do I make sure that when reconstituting the state, kryo does not
> complain? It tries to map the previous state to the new definition of Class
> A and complains that it cannot read the value for `String b`.
>
> Unfortunately the state is not using POJO serializer.
>
> Thanks and Regards.
>
> Vishal
>
>
>
>
>


A question about restoring state with an additional variable with kryo

2022-09-15 Thread Vishal Santoshi
I have state in rocksDB that represents say

class A {
  String a
}

I now change my class and add another variable


Class A {
  String a;
  long b = 0;
}

How do I make sure that when reconstituting the state, kryo does not
complain? It tries to map the previous state to the new definition of Class
A and complains that it cannot read the value for `String b`.

Unfortunately the state is not using POJO serializer.

Thanks and Regards.

Vishal


Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-14 Thread yu'an huang
I guess there may be a class conflict between your user jar and Flink lib. For 
the error: java.lang.NoClassDefFoundError, it may caused Exception happening 
when Initializing a class. I suggest you set the log level to DEBUG and send 
the client log here. Let’s look whether there are any new finding in the client 
log.

Best,
Yuan


> On 13 Sep 2022, at 1:55 PM, zhanghao.c...@outlook.com wrote:
> 
> @h.yuan...@gmail.com <mailto:h.yuan...@gmail.com> Any thoughts on this?
> 
> Best,
> Zhanghao Chen
> From: hjw <1010445...@qq.com>
> Sent: Tuesday, September 13, 2022 11:24
> To: zhanghao.chen ; user 
> Subject: Re: A classloading question about submitting Flink jobs on 
> Yarn-Perjob Mode
>  
> Hi,
> 
> The yarn.classpath.include-user-jar parameter is shown as 
> yarn.per-job-cluster.include-user-jar parameter in Flink 1.14. 
> I have tried DISABLED、FIRST、LAST、ORDER .But the error still happened.
> Best,
> Hjw
>  
> 
> 
> -- Original --
> From: "zhanghao.chen" ;
> Date: Tue, Sep 13, 2022 09:42 AM
> To: "hjw"<1010445...@qq.com>;"user";
> Subject: Re: A classloading question about submitting Flink jobs on 
> Yarn-Perjob Mode
> 
> Hi,
> 
> Did you set any additional classloading-related configs (esp. the 
> yarn.classpath.include-user-jar parameter)?
> 
> Best,
> Zhanghao Chen
> From: hjw <1010445...@qq.com>
> Sent: Tuesday, September 13, 2022 1:58
> To: user 
> Subject: A classloading question about submitting Flink jobs on Yarn-Perjob 
> Mode
>  
> When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
> client-side generation of the Jobgraph submitd to yarn.
> 
> Error:
> java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter
> 
> Because the cluster is public, there is already related 
> flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.
> 
> However ,this class is included by my jar. This class is provided by orc-core 
> package. I have packaged it in my jar.
> 
> After my attempts, the following measures can solve my problem.
> 1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
> directory and packaged it to my jar.
> 2.put the orc-core to /opt/flink/lib directory.
> 
> However, I would like to know why an error occurs when placing the 
> flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
> and packaging orc-core into the my jar.
> 
> 
> Env:
> Flink version: flink 1.14.0
> 
> Best,
> Hjw



Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread zhanghao.chen
@h.yuan...@gmail.com<mailto:h.yuan...@gmail.com> Any thoughts on this?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 11:24
To: zhanghao.chen ; user 
Subject: Re: A classloading question about submitting Flink jobs on Yarn-Perjob 
Mode

Hi,

The yarn.classpath.include-user-jar parameter is shown as 
yarn.per-job-cluster.include-user-jar parameter in Flink 1.14.
I have tried DISABLED、FIRST、LAST、ORDER .But the error still happened.

Best,
Hjw



-- Original --
From: "zhanghao.chen" ;
Date: Tue, Sep 13, 2022 09:42 AM
To: "hjw"<1010445...@qq.com>;"user";
Subject: Re: A classloading question about submitting Flink jobs on Yarn-Perjob 
Mode

Hi,

Did you set any additional classloading-related configs (esp. the 
yarn.classpath.include-user-jar parameter)?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 1:58
To: user 
Subject: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
client-side generation of the Jobgraph submitd to yarn.

Error:
java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter

Because the cluster is public, there is already related 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.

However ,this class is included by my jar. This class is provided by orc-core 
package. I have packaged it in my jar.

After my attempts, the following measures can solve my problem.
1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
directory and packaged it to my jar.
2.put the orc-core to /opt/flink/lib directory.

However, I would like to know why an error occurs when placing the 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
and packaging orc-core into the my jar.


Env:
Flink version: flink 1.14.0


Best,
Hjw



Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread hjw
Hi,


The yarn.classpath.include-user-jar parameter is shown as 
yarn.per-job-cluster.include-user-jarparameter in Flink 1.14.
I have triedDISABLED??FIRST??LAST??ORDER .But the error still happened.

Best,
Hjw







--Original--
From:   
 "zhanghao.chen"



  1   2   3   4   5   6   7   8   9   10   >