Big Data Contract Roles ?

2022-09-14 Thread sri hari kali charan Tummala
Hi Flink Users/ Spark Users,

Is anyone hiring contract corp to corp big Data spark scala or Flink scala
roles ?


Thanks
Sri


Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-14 Thread yu'an huang
I guess there may be a class conflict between your user jar and Flink lib. For 
the error: java.lang.NoClassDefFoundError, it may caused Exception happening 
when Initializing a class. I suggest you set the log level to DEBUG and send 
the client log here. Let’s look whether there are any new finding in the client 
log.

Best,
Yuan


> On 13 Sep 2022, at 1:55 PM, zhanghao.c...@outlook.com wrote:
> 
> @h.yuan...@gmail.com  Any thoughts on this?
> 
> Best,
> Zhanghao Chen
> From: hjw <1010445...@qq.com>
> Sent: Tuesday, September 13, 2022 11:24
> To: zhanghao.chen ; user 
> Subject: Re: A classloading question about submitting Flink jobs on 
> Yarn-Perjob Mode
>  
> Hi,
> 
> The yarn.classpath.include-user-jar parameter is shown as 
> yarn.per-job-cluster.include-user-jar parameter in Flink 1.14. 
> I have tried DISABLED、FIRST、LAST、ORDER .But the error still happened.
> Best,
> Hjw
>  
> 
> 
> -- Original --
> From: "zhanghao.chen" ;
> Date: Tue, Sep 13, 2022 09:42 AM
> To: "hjw"<1010445...@qq.com>;"user";
> Subject: Re: A classloading question about submitting Flink jobs on 
> Yarn-Perjob Mode
> 
> Hi,
> 
> Did you set any additional classloading-related configs (esp. the 
> yarn.classpath.include-user-jar parameter)?
> 
> Best,
> Zhanghao Chen
> From: hjw <1010445...@qq.com>
> Sent: Tuesday, September 13, 2022 1:58
> To: user 
> Subject: A classloading question about submitting Flink jobs on Yarn-Perjob 
> Mode
>  
> When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
> client-side generation of the Jobgraph submitd to yarn.
> 
> Error:
> java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter
> 
> Because the cluster is public, there is already related 
> flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.
> 
> However ,this class is included by my jar. This class is provided by orc-core 
> package. I have packaged it in my jar.
> 
> After my attempts, the following measures can solve my problem.
> 1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
> directory and packaged it to my jar.
> 2.put the orc-core to /opt/flink/lib directory.
> 
> However, I would like to know why an error occurs when placing the 
> flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
> and packaging orc-core into the my jar.
> 
> 
> Env:
> Flink version: flink 1.14.0
> 
> Best,
> Hjw



Re: DataStream and DataStreamSource

2022-09-14 Thread Noel OConnor
Awesome, thanks for the info! much appreciated.

On Wed, Sep 14, 2022 at 5:04 PM Jing Ge  wrote:
>
> Hi,
>
> Welcome to the Flink community!
>
> A DataStreamSource is a DataStream. It is normally used as the starting point 
> of a DataStream.  All related methods in StreamExecutionEnvironment that 
> create a DataStream return actually a DataStreamSource, because it is where a 
> DataStream starts.
>
> Commonly, you don't care about DataStreamSource, just use DataStream even if 
> methods in StreamExecutionEnvironment return a DataStreamSource [1]. 
> DataStreamSource created by those methods in StreamExecutionEnvironment will 
> use built-in SourceTranfromation. If you want to modify the configuration of 
> the transformation, you can specifically use the DataStreamSource 
> type(instead of DataStream) which provides some setter methods extended from 
> SingleOutputStreamOperator.
>
> Best regards,
> Jing
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/
>
> On Wed, Sep 14, 2022 at 3:23 PM Noel OConnor  wrote:
>>
>> Hi,
>> I'm new to flink and I'm trying to integrate it with apache pulsar.
>> I've gone through the demos and I get how they work but one aspect
>> that I can't figure out is what's the difference between a DataStream
>> and a DataStreamSource.
>> When would you use one over the other?
>>
>> cheers
>> Noel


Re: DataStream and DataStreamSource

2022-09-14 Thread Jing Ge
Hi,

Welcome to the Flink community!

A DataStreamSource is a DataStream. It is normally used as the starting
point of a DataStream.  All related methods in StreamExecutionEnvironment
that create a DataStream return actually a DataStreamSource, because it is
where a DataStream starts.

Commonly, you don't care about DataStreamSource, just use DataStream even
if methods in StreamExecutionEnvironment return a DataStreamSource [1].
DataStreamSource created by those methods in StreamExecutionEnvironment
will use built-in SourceTranfromation. If you want to modify the
configuration of the transformation, you can specifically use the
DataStreamSource type(instead of DataStream) which provides some setter
methods extended from SingleOutputStreamOperator.

Best regards,
Jing

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/

On Wed, Sep 14, 2022 at 3:23 PM Noel OConnor  wrote:

> Hi,
> I'm new to flink and I'm trying to integrate it with apache pulsar.
> I've gone through the demos and I get how they work but one aspect
> that I can't figure out is what's the difference between a DataStream
> and a DataStreamSource.
> When would you use one over the other?
>
> cheers
> Noel
>


Re: ExecutionMode in ExecutionConfig

2022-09-14 Thread zhanghao.chen
It's added in Flink 1.14: 
https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.14/#expose-a-consistent-globaldataexchangemode.
 Not sure if there's a way to change this in 1.13

Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 23:38
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


I can give this a try. Do you know which Flink version does this feature become 
available in?



ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 11:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.



The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: ExecutionMode in ExecutionConfig



Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I’ve attached a picture of this job graph - I’m reading from a total of 3 data 
sources – the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API’s job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API’s job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I’m attempting to have 
the 

RE: ExecutionMode in ExecutionConfig

2022-09-14 Thread Hailu, Andreas
I can give this a try. Do you know which Flink version does this feature become 
available in?

ah

From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 11:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig

Could you try setting "execution.batch-shuffle-mode'='ALL_EXCHANGES_PIPELINED'? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.

The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.



Best,
Zhanghao Chen

From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: ExecutionMode in ExecutionConfig


Hi Zhanghao,



That seems different than what I'm referencing and one of my points of 
confusion - the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I'm referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I'm not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the "classic" 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org






Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I've attached a picture of this job graph - I'm reading from a total of 3 data 
sources - the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API's job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API's job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I'm attempting to have 
the same behavior between them. I ask about ExecutionMode as I am able to 
replicate this behavior in DataSet by setting the ExecutionMode from the 
default of PIPELINED to BATCH.



Thanks!



best,

ah







Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we 

Re: ExecutionMode in ExecutionConfig

2022-09-14 Thread zhanghao.chen
Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.

The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.


Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I’ve attached a picture of this job graph - I’m reading from a total of 3 data 
sources – the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API’s job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API’s job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I’m attempting to have 
the same behavior between them. I ask about ExecutionMode as I am able to 
replicate this behavior in DataSet by setting the ExecutionMode from the 
default of PIPELINED to BATCH.



Thanks!



best,

ah







Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 

Re: Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-14 Thread zhanghao.chen
Flink will not try to help you do autoscaling and the parallelism is fixed 
unless you enable reactive mode/adaptive scheduler. Max parallelism just means 
the maximum parallelism with which you can rescale your job without losing 
states. The max parallelism limit is related to the Flink key group mechanism 
used to rescale job with keyed states.

I've not sure about the future plan for extending reactive mode beyond 
standalone mode though.

Best,
Zhanghao Chen

From: Erez Yaakov 
Sent: Wednesday, September 14, 2022 20:32
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Maybe 'automatically parallelism change' is a not accurate term for describing 
what I mean, so let me re-phrase it:



Assuming I'm submitting my job with parallelism = 2 and max parallelism  = 128 
(default). My expectation is that any instance of the job will actually have 
several instances at run time, from 2 to 128.

Meaning, Flink starts my job with 2 instances of every operator, but once it 
realize that specific operator becomes the bottleneck, it'll create additional 
instances of this operator, until it'll get to the max of 128. For example, in 
my case the kafka topic is partitioned to 16 partitions. Once I increase the 
load and consumers cannot handle that (but the downstream operators do!), I’d 
expect that Flink will increase the actual amount of parallel instances of the 
source operator and if new TaskManager pods are required, flink will interact 
directly with k8s (I'm working on k8s native deployment) in order to spin up 
additional pods.



Is it the expected behavior from k8s native deployment? Or is it that in this 
mode the number of actual parallel operator instances is fixed per job and is 
not changed dynamically during the job life?



Thanks for your clarifications. It's really helpful!





From: zhanghao.c...@outlook.com 
Sent: Tuesday, September 13, 2022 4:30 AM
To: Erez Yaakov ; user@flink.apache.org
Subject: Re: Can flink dynamically detect high load and increase the job 
parallelism automatically?



EXTERNAL EMAIL



Hi Erez,



Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.



Best,

Zhanghao Chen



From: Erez Yaakov 
mailto:erez.yaa...@niceactimize.com>>
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?



Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage…) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons 

RE: ExecutionMode in ExecutionConfig

2022-09-14 Thread Hailu, Andreas
Hi Zhanghao,

That seems different than what I'm referencing and one of my points of 
confusion - the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

I'm referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I'm not able to find 
documentation on this anywhere.



ah

From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it
Execution Mode (Batch/Streaming) | Apache 
Flink
Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the "classic" 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...
nightlies.apache.org



Best,
Zhanghao Chen

From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig


Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I've attached a picture of this job graph - I'm reading from a total of 3 data 
sources - the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API's job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API's job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I'm attempting to have 
the same behavior between them. I ask about ExecutionMode as I am able to 
replicate this behavior in DataSet by setting the ExecutionMode from the 
default of PIPELINED to BATCH.



Thanks!



best,

ah





Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


DataStream and DataStreamSource

2022-09-14 Thread Noel OConnor
Hi,
I'm new to flink and I'm trying to integrate it with apache pulsar.
I've gone through the demos and I get how they work but one aspect
that I can't figure out is what's the difference between a DataStream
and a DataStreamSource.
When would you use one over the other?

cheers
Noel


RE: Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-14 Thread Erez Yaakov
Maybe 'automatically parallelism change' is a not accurate term for describing 
what I mean, so let me re-phrase it:

Assuming I'm submitting my job with parallelism = 2 and max parallelism  = 128 
(default). My expectation is that any instance of the job will actually have 
several instances at run time, from 2 to 128.
Meaning, Flink starts my job with 2 instances of every operator, but once it 
realize that specific operator becomes the bottleneck, it'll create additional 
instances of this operator, until it'll get to the max of 128. For example, in 
my case the kafka topic is partitioned to 16 partitions. Once I increase the 
load and consumers cannot handle that (but the downstream operators do!), I'd 
expect that Flink will increase the actual amount of parallel instances of the 
source operator and if new TaskManager pods are required, flink will interact 
directly with k8s (I'm working on k8s native deployment) in order to spin up 
additional pods.

Is it the expected behavior from k8s native deployment? Or is it that in this 
mode the number of actual parallel operator instances is fixed per job and is 
not changed dynamically during the job life?

Thanks for your clarifications. It's really helpful!


From: zhanghao.c...@outlook.com 
Sent: Tuesday, September 13, 2022 4:30 AM
To: Erez Yaakov ; user@flink.apache.org
Subject: Re: Can flink dynamically detect high load and increase the job 
parallelism automatically?


EXTERNAL EMAIL

Hi Erez,

Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.

Best,
Zhanghao Chen

From: Erez Yaakov 
mailto:erez.yaa...@niceactimize.com>>
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage...) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Extending RMQSource

2022-09-14 Thread Nadia Mostafa
Hello,

I have extended the RMQSource class and overrode setupQueue method to
declare a queue and bind it to an exchange.

Now, when I stop the flink job the queue is not deleted. I tried to
override cancel() and close() to delete the queue but I found they are not
called on stopping the job.

Is there a way to delete that queue?

Thanks in advance!