Re: Production deployment of Flink

2023-12-07 Thread Gyula Fóra
Hi!

We recommend using the community supported Flink Kubernetes Operator:

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.7/docs/try-flink-kubernetes-operator/quick-start/

Cheers,
Gyula

On Thu, Dec 7, 2023 at 6:33 PM Tauseef Janvekar 
wrote:

> Hi Al,
>
> I am using flink in my local setup and it works just fine - I installed it
> using confluent example training course. Here I had to manually execute
> start-cluster.sh and othe steps to start task managers.
>
> We installed flink on kubernetes using bitnami helm chart and it works
> just fine. But we get error messages like TM and JM are not able to
> communicate.
> Should we use something else to deploy flink on kubernetes ?
>
> Can someone please provide instructions on how to enable production ready
> flink deployment for version 1.18 on kubernetes.
>
> Thanks,
> Tauseef
>


Feature flag functionality on flink

2023-12-07 Thread Oscar Perez via user
Hi,
We would like to enable sort of a feature flag functionality for flink jobs.

The idea would be to use broadcast state reading from a configuration topic
and then ALL operators with logic would listen to this state.

This documentation:

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/broadcast_state/

explains how a certain operator can use this broadcast state but the
problem we are having is understanding how we can share the same state
across many different operators. One way is to create multiple streams, one
per operator reading from the same topic and then connect to the multiple
operators in a keyedbroadcastprocessfunction but this seems overkill

Is there an elegant solution to this problem?
regards,
Oscar


Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-07 Thread Maximilian Michels
Hey Rui,

+1 for changing the default restart strategy to exponential-delay.
This is something all users eventually run into. They end up changing
the restart strategy to exponential-delay. I think the current
defaults are quite balanced. Restarts happen quickly enough unless
there are consecutive failures where I think it makes sense to double
the waiting time up till the max.

-Max


On Wed, Dec 6, 2023 at 12:51 AM Mason Chen  wrote:
>
> Hi Rui,
>
> Sorry for the late reply. I was suggesting that perhaps we could do some
> testing with Kubernetes wrt configuring values for the exponential restart
> strategy. We've noticed that the default strategy in 1.17 caused a lot of
> requests to the K8s API server for unstable deployments.
>
> However, people in different Kubernetes setups will have different limits
> so it would be challenging to provide a general benchmark. Another thing I
> found helpful in the past is to refer to Kubernetes--for example, the
> default strategy is exponential for pod restarts and we could draw
> inspiration from what they have set as a general purpose default config.
>
> Best,
> Mason
>
> On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi David and Mason,
> >
> > Thanks for your feedback!
> >
> > To David:
> >
> > > Given that the new default feels more complex than the current behavior,
> > if we decide to do this I think it will be important to include the
> > rationale you've shared in the documentation.
> >
> > Sounds make sense to me, I will add the related doc if we
> > update the default strategy.
> >
> > To Mason:
> >
> > > I suppose we could do some benchmarking on what works well for the
> > resource providers that Flink relies on e.g. Kubernetes. Based on
> > conferences and blogs,
> > > it seems most people are relying on Kubernetes to deploy Flink and the
> > restart strategy has a large dependency on how well Kubernetes can scale to
> > requests to redeploy the job.
> >
> > Sorry, I didn't understand what type of benchmarking
> > we should do, could you elaborate on it? Thanks a lot.
> >
> > Best,
> > Rui
> >
> > On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:
> >
> >> Hi Rui,
> >>
> >> I suppose we could do some benchmarking on what works well for the
> >> resource providers that Flink relies on e.g. Kubernetes. Based on
> >> conferences and blogs, it seems most people are relying on Kubernetes to
> >> deploy Flink and the restart strategy has a large dependency on how well
> >> Kubernetes can scale to requests to redeploy the job.
> >>
> >> Best,
> >> Mason
> >>
> >> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
> >> wrote:
> >>
> >>> Rui,
> >>>
> >>> I don't have any direct experience with this topic, but given the
> >>> motivation you shared, the proposal makes sense to me. Given that the new
> >>> default feels more complex than the current behavior, if we decide to do
> >>> this I think it will be important to include the rationale you've shared 
> >>> in
> >>> the documentation.
> >>>
> >>> David
> >>>
> >>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
> >>>
>  Hi dear flink users and devs:
> 
>  FLIP-364[1] intends to make some improvements to restart-strategy
>  and discuss updating some of the default values of exponential-delay,
>  and whether exponential-delay can be used as the default
>  restart-strategy.
>  After discussing at dev mail list[2], we hope to collect more feedback
>  from Flink users.
> 
>  # Why does the default restart-strategy need to be updated?
> 
>  If checkpointing is enabled, the default value is fixed-delay with
>  Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>  the job will restart infinitely with high frequency when a job
>  continues to fail.
> 
>  When the Kafka cluster fails, a large number of flink jobs will be
>  restarted frequently. After the kafka cluster is recovered, a large
>  number of high-frequency restarts of flink jobs may cause the
>  kafka cluster to avalanche again.
> 
>  Considering the exponential-delay as the default strategy with
>  a couple of reasons:
> 
>  - The exponential-delay can reduce the restart frequency when
>    a job continues to fail.
>  - It can restart a job quickly when a job fails occasionally.
>  - The restart-strategy.exponential-delay.jitter-factor can avoid r
>    estarting multiple jobs at the same time. It’s useful to prevent
>    avalanches.
> 
>  # What are the current default values[4] of exponential-delay?
> 
>  restart-strategy.exponential-delay.initial-backoff : 1s
>  restart-strategy.exponential-delay.backoff-multiplier : 2.0
>  restart-strategy.exponential-delay.jitter-factor : 0.1
>  restart-strategy.exponential-delay.max-backoff : 5 min
>  restart-strategy.exponential-delay.reset-backoff-threshold : 1h
> 
>  backoff-multiplier=2 means 

Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-07 Thread Maximilian Michels
Hey Rui,

+1 for changing the default restart strategy to exponential-delay.
This is something all users eventually run into. They end up changing
the restart strategy to exponential-delay. I think the current
defaults are quite balanced. Restarts happen quickly enough unless
there are consecutive failures where I think it makes sense to double
the waiting time up till the max.

-Max


On Wed, Dec 6, 2023 at 12:51 AM Mason Chen  wrote:
>
> Hi Rui,
>
> Sorry for the late reply. I was suggesting that perhaps we could do some
> testing with Kubernetes wrt configuring values for the exponential restart
> strategy. We've noticed that the default strategy in 1.17 caused a lot of
> requests to the K8s API server for unstable deployments.
>
> However, people in different Kubernetes setups will have different limits
> so it would be challenging to provide a general benchmark. Another thing I
> found helpful in the past is to refer to Kubernetes--for example, the
> default strategy is exponential for pod restarts and we could draw
> inspiration from what they have set as a general purpose default config.
>
> Best,
> Mason
>
> On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi David and Mason,
> >
> > Thanks for your feedback!
> >
> > To David:
> >
> > > Given that the new default feels more complex than the current behavior,
> > if we decide to do this I think it will be important to include the
> > rationale you've shared in the documentation.
> >
> > Sounds make sense to me, I will add the related doc if we
> > update the default strategy.
> >
> > To Mason:
> >
> > > I suppose we could do some benchmarking on what works well for the
> > resource providers that Flink relies on e.g. Kubernetes. Based on
> > conferences and blogs,
> > > it seems most people are relying on Kubernetes to deploy Flink and the
> > restart strategy has a large dependency on how well Kubernetes can scale to
> > requests to redeploy the job.
> >
> > Sorry, I didn't understand what type of benchmarking
> > we should do, could you elaborate on it? Thanks a lot.
> >
> > Best,
> > Rui
> >
> > On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:
> >
> >> Hi Rui,
> >>
> >> I suppose we could do some benchmarking on what works well for the
> >> resource providers that Flink relies on e.g. Kubernetes. Based on
> >> conferences and blogs, it seems most people are relying on Kubernetes to
> >> deploy Flink and the restart strategy has a large dependency on how well
> >> Kubernetes can scale to requests to redeploy the job.
> >>
> >> Best,
> >> Mason
> >>
> >> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
> >> wrote:
> >>
> >>> Rui,
> >>>
> >>> I don't have any direct experience with this topic, but given the
> >>> motivation you shared, the proposal makes sense to me. Given that the new
> >>> default feels more complex than the current behavior, if we decide to do
> >>> this I think it will be important to include the rationale you've shared 
> >>> in
> >>> the documentation.
> >>>
> >>> David
> >>>
> >>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
> >>>
>  Hi dear flink users and devs:
> 
>  FLIP-364[1] intends to make some improvements to restart-strategy
>  and discuss updating some of the default values of exponential-delay,
>  and whether exponential-delay can be used as the default
>  restart-strategy.
>  After discussing at dev mail list[2], we hope to collect more feedback
>  from Flink users.
> 
>  # Why does the default restart-strategy need to be updated?
> 
>  If checkpointing is enabled, the default value is fixed-delay with
>  Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>  the job will restart infinitely with high frequency when a job
>  continues to fail.
> 
>  When the Kafka cluster fails, a large number of flink jobs will be
>  restarted frequently. After the kafka cluster is recovered, a large
>  number of high-frequency restarts of flink jobs may cause the
>  kafka cluster to avalanche again.
> 
>  Considering the exponential-delay as the default strategy with
>  a couple of reasons:
> 
>  - The exponential-delay can reduce the restart frequency when
>    a job continues to fail.
>  - It can restart a job quickly when a job fails occasionally.
>  - The restart-strategy.exponential-delay.jitter-factor can avoid r
>    estarting multiple jobs at the same time. It’s useful to prevent
>    avalanches.
> 
>  # What are the current default values[4] of exponential-delay?
> 
>  restart-strategy.exponential-delay.initial-backoff : 1s
>  restart-strategy.exponential-delay.backoff-multiplier : 2.0
>  restart-strategy.exponential-delay.jitter-factor : 0.1
>  restart-strategy.exponential-delay.max-backoff : 5 min
>  restart-strategy.exponential-delay.reset-backoff-threshold : 1h
> 
>  backoff-multiplier=2 means 

Production deployment of Flink

2023-12-07 Thread Tauseef Janvekar
Hi Al,

I am using flink in my local setup and it works just fine - I installed it
using confluent example training course. Here I had to manually execute
start-cluster.sh and othe steps to start task managers.

We installed flink on kubernetes using bitnami helm chart and it works just
fine. But we get error messages like TM and JM are not able to communicate.
Should we use something else to deploy flink on kubernetes ?

Can someone please provide instructions on how to enable production ready
flink deployment for version 1.18 on kubernetes.

Thanks,
Tauseef


RE: Reading text file from S3

2023-12-07 Thread Fourais
Thank you Jaehyeon Kim. A workaround consists   on exporting was credentials as 
environment variables using https://github.com/linaro-its/aws2-wrap The command 
below will set the environment variables AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, which makes the Flink code work. 

On 2023/12/06 22:57:28 Fourais wrote:
> Hi,
> 
> Using Flink 1.18 and Java 17, I am trying to read a text file from S3 using
> env.readTextFile("s3://mybucket/folder1/file.txt"). When I run the app in
> the IDE, I get the following error:
> 
> Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS
> Credentials provided by DynamicTemporaryAWSCredentialsProvider
> TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider
> EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
> com.amazonaws.SdkClientException: Unable to load AWS credentials from
> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
> 
> I authenticated into AWS using SSO e.g. aws sso login --profile
> my-aws-profile, so I do not have any keys set as environment variables. I
> have tried the different CredentialsProvider options suggested in the error
> message without success.
> 
> Could you help me identify what I am missing?
> 
> Thank you very much for your help,
> /Fourais
> 

Unable to locate full stderr logs while using Flink Operator

2023-12-07 Thread Edgar H
Hi all!

I've just deployed an Apache Beam job using FlinkRunner in k8s and
found that the job failed and has the following field:

  error: >-
 
{"type":"org.apache.flink.util.SerializedThrowable","message":"org.apache.flink.client.program.ProgramInvocationException:
The main method caused an error: Error while translating UnboundedSource:

org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@109c7700","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.util.SerializedThrowable","message":"java.lang.RuntimeException:
Error while translating UnboundedSource:

org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@109c7700","additionalMetadata":{}},{"type":"org.apache.flink.util.SerializedThrowable","message":"org.apache.kafka.common.KafkaException:
Failed to construct kafka consumer","additionalMetadata":{}}]}

However, this is not the full trace and can't really understand the
reason behind the exception since there's more log to be shown from
the tests I've done in local where the specific reason is shown
further ahead.

Accessing /opt/flink/log is not an option either since only stdout
logs are in there, so no trace of errors whatsoever.

I've seen this properties when lurking the docs:

kubernetes.operator.exception.stacktrace.enabled: true
kubernetes.operator.exception.stacktrace.max.length: 2
kubernetes.operator.exception.field.max.length: 2
kubernetes.operator.exception.throwable.list.max.count: 20

But still, the error field is the same and further info is not provided.

What am I missing to see the full stacktrace somewhere using the operator?


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-07 Thread elakiya udhayanan
Hi Chen/ Feng,

Thanks for pointing out the mistake I made, after correcting the query I am
able to run the job with two sinks successfully.

Thanks,
Elakiya

On Thu, Dec 7, 2023 at 4:37 AM Chen Yu  wrote:

> Hi  Chen,
> You should tell flink which table to insert by “INSERT INTO XXX SELECT
> XXX”.
>
> For single non insert query, flink will collect output to the console
> automatically. Therefore, you don’t need to add insert also works.
>
> But you must point out target table specifically when you need to write
> data to external storage.
>
> Like,
>
> String relateQuery = "insert into xxx select correlator_id , name, 
> relationship from Correlation; ;
>
>
> Best,
> Yu Chen
>
>
> 获取 Outlook for iOS 
> --
> *发件人:* Zhanghao Chen 
> *发送时间:* Wednesday, December 6, 2023 7:21:50 PM
> *收件人:* elakiya udhayanan ; user@flink.apache.org <
> user@flink.apache.org>
> *主题:* Re: Query on using two sinks for a Flink job (Flink SQL)
>
> Hi Elakiya,
>
> You can try executing TableEnvironmentImpl#executeInternal for non-insert
> statements, then using StatementSet.addInsertSql to add multiple insertion
> statetments, and finally calling StatementSet#execute.
>
> Best,
> Zhanghao Chen
> --
> *From:* elakiya udhayanan 
> *Sent:* Wednesday, December 6, 2023 17:49
> *To:* user@flink.apache.org 
> *Subject:* Query on using two sinks for a Flink job (Flink SQL)
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used: *String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>


keyby mapState use question

2023-12-07 Thread Jake.zhang
Hi all:


KeyBy process function


EventKeyedBroadcastProcessFunction {
 
  private transient mapstate = null;


 public void open(Configuration parameters) throws Exception {
// initial map state
  }


 public void processElement() {
// can't get onTimer() function set state key 
value
 }
 
 public void onTimer() {
   // set map state key value first
 }

}


why processElement function can't get onTimer function sets value ?


thanks.