Re: Using managed keyed state with AsynIo

2020-08-13 Thread Arvid Heise
Hi KristoffSC,

you are right that state is not shared across operators - I forgot about
that. So the approach would only be valid as is if the state can be
properly separated into two independent subtasks. For example, you need the
state to find the database key and you store the full entry in Flink state
afterwards. Then you could fetch the key in the map before async IO and
keep the full record in the map after async IO.

Another approach is to perform some kind of feedback from async IO to the
first map. There is usually a tradeoff between performance (use a Kafka
topic for feedback) and complexity (write some TCP socket magic). I'd
rather recommend to have a look at statefun though [1], which implements
this feedback in an efficient way and provides a good abstraction for
everything that is state-related. Unfortunately, mixing Flink jobs and
statefun applications is still not easily possible - I'm assuming it would
happen in the next major release. But maybe, you can express everything in
statefun, at which point, it's the best choice.

For your question: it shouldn't make any difference, as the function gets
serialized in the main() and deserialized at each JM/TM resulting in many
copies. The only difference is that in your main(), you have one fewer
copy. Since Flink state is only touched in TM, the function instances are
different anyways.

[1] https://github.com/apache/flink-statefun

On Thu, Aug 13, 2020 at 2:53 PM KristoffSC 
wrote:

> Hi Arvid,
> thank you for the respond.
> Yeah I tried to run my job shortly after posting my message and I got
> "State
> is not supported in rich async function" ;)
>
> I came up with a solution that would solve my initial problem -
> concurrent/Async problem of processing messages with the same key but
> unfortunately stet is not sported here.
>
> Thank you for the proposition
> source -> keyby -> map (retrieve state) -> async IO (use state) -> map
> (update state)
>
> However I'm a little bit surprised. I thought that state on a keyed cannot
> be shared between operators, and here you are suggesting doing that. Is it
> possible then?
>
>
> Using this occasion I have additional question, Is there any difference
> from
> Flink perspective between this two approaches:
>
> MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
> stateless object, but it uses Flink keyed state.
>
> Setup 1:
>
> source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink
>
> Setup 2:
> source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
> process(new MyProcessFunction()) -> sink
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: TaskManagers are still up even after job execution completed in PerJob deployment mode

2020-08-13 Thread narasimha
Thanks, Till.

Currently, the instance is getting timeout error and terminating the
TaskManager.

Sure, will try native K8s.

On Thu, Aug 13, 2020 at 3:12 PM Till Rohrmann  wrote:

> Hi Narasimha,
>
> if you are deploying the Flink cluster manually on K8s then there is
> no automatic way of stopping the TaskExecutor/TaskManager pods. This is
> something you have to do manually (similar to a standalone deployment). The
> only clean up mechanism is the automatic termination of the TaskManager
> processes if they cannot connect to the ResourceManager after the specified
> timeout. However, you can use Flink's native K8s integration with which you
> can also deploy a per-job mode cluster [1]. The native K8s integration is
> able to clean up the whole cluster.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
>
> Cheers,
> Till
>
> On Thu, Aug 13, 2020 at 11:26 AM Kostas Kloudas 
> wrote:
>
>> Hi Narasimha,
>>
>> I am not sure why the TMs are not shutting down, as Yun said, so I am
>> cc'ing Till here as he may be able to shed some light.
>> For the application mode, the page in the documentation that you
>> pointed is the recommended way to deploy an application in application
>> mode.
>>
>> Cheers,
>> Kostas
>>
>> On Mon, Aug 10, 2020 at 11:16 AM narasimha 
>> wrote:
>> >
>> > Thanks, Yun for the prompt reply.
>> >
>> > TaskManager was actively looking for ResourceManager, on timeout of 5
>> mins it got terminated.
>> >
>> > Any recommendations around this? Or is this the way this will work.
>> >
>> > What should be done around this to make the application start in
>> application deployment mode?
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-application
>> >
>> > Here it has shown to invoke Flink binary to start. Is this the
>> preferred way?
>> >
>> >
>> > On Mon, Aug 10, 2020 at 1:46 PM Yun Tang  wrote:
>> >>
>> >> Hi
>> >>
>> >> From your description, the task managers are still alive even the job
>> is finished and job manager has shut down?
>> >> If so, I think this is really weird, could you check what the TM is
>> doing via jstack and the logs in job manager and idle task manager?
>> >> The task manager should be released when the JM is shutting down.
>> >> Moreover, idle task manager would also release after 30 seconds by
>> default [1].
>> >>
>> >>
>> >> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#resourcemanager-taskmanager-timeout
>> >>
>> >> Best
>> >> Yun Tang
>> >>
>> >>
>> >> 
>> >> From: narasimha 
>> >> Sent: Monday, August 10, 2020 15:36
>> >> To: user@flink.apache.org 
>> >> Subject: TaskManagers are still up even after job execution completed
>> in PerJob deployment mode
>> >>
>> >>
>> >> I'm trying out Flink Per-Job deployment using docker-compose.
>> >>
>> >> Configurations:
>> >>
>> >> version: "2.2"
>> >> jobs:
>> >>   jobmanager:
>> >> build: ./
>> >> image: flink_local:1.1
>> >> ports:
>> >>   - "8081:8081"
>> >> command: standalone-job --job-classname com.organization.BatchJob
>> >> environment:
>> >>   - |
>> >> FLINK_PROPERTIES=
>> >> jobmanager.rpc.address: jobmanager
>> >> parallelism.default: 2
>> >>   taskmanager:
>> >> image: flink_local:1.1
>> >> depends_on:
>> >>   - jobmanager
>> >> command: taskmanager
>> >> scale: 1
>> >> environment:
>> >>   - |
>> >> FLINK_PROPERTIES=
>> >> jobmanager.rpc.address: jobmanager
>> >> taskmanager.numberOfTaskSlots: 2
>> >> parallelism.default: 2
>> >>
>> >> Flink image is extended with job.jar, Job executed successfully.
>> >>
>> >> JobManager exited after the job is completed, but is still running,
>> which is not expected.
>> >>
>> >> Any configurations have to be added to exit both JobManager and
>> TaskManger?
>> >>
>> >> Versions:
>> >>
>> >> Flink - 1.11.0
>> >>
>> >> Java - 1.8
>> >>
>> >>
>> >> --
>> >> A.Narasimha Swamy
>> >>
>> >
>> >
>> > --
>> > A.Narasimha Swamy
>> >
>>
>

-- 
A.Narasimha Swamy


Re: Tools for Flink Job performance testing

2020-08-13 Thread narasimha
Thanks, Arvid.

The guide was helpful in how to start working with Flink. I'm currently
exploring SQL/Table API.

Will surely come back for queries on it.

On Thu, Aug 13, 2020 at 1:25 PM Arvid Heise  wrote:

> Hi,
>
> performance testing is quite vague. Usually you start by writing a small
> first version of your pipeline and check how the well computation scales on
> your data. Flink's web UI [1] already helps quite well for the first time.
> Usually you'd also add some metric system and look for advanced metrics in
> there [2].
>
> Now, you might be satisfied with your current solution and just start to
> extend it. Or you feel that it's not fast enough, scaling well enough. Then
> you can tweak your pipeline and perform smaller performance tests on your
> user code. Doing performance tests on the whole pipeline [3] would probably
> be hard if you just started as you also need to understand internals of
> Flink.
>
> I also recommend to use Table API / SQL instead of DataStream if your
> application can be expressed well in relational operations. Table API
> already applies a wide range of optimizations that are much harder to
> implement manually in DataStream API. Table API will also bring your
> noticeable performance improvements over time when you update to a newer
> Flink version.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/application_profiling.html
>
> On Mon, Aug 10, 2020 at 1:06 PM narasimha  wrote:
>
>> Hi,
>>
>> I'm new to the streaming world, checking on Performance testing tools.
>> Are there any recommended Performance testing tools for Flink?
>>
>> --
>> A.Narasimha Swamy
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
A.Narasimha Swamy


Re: [Flink-KAFKA-KEYTAB] Kafkaconsumer error Kerberos

2020-08-13 Thread Vijayendra Yadav
Hi Yangze,

I tried the following: maybe I am missing something.
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
-yt,--yarnship 

Run:
/usr/lib/flink/bin/flink run -m yarn-cluster
-yt ${app_install_path}/conf

my KRB5.conf is in  ${app_install_path}/conf n master node (local build
path)

When this folder is shipped to yarn, how should i reference this KRB5.conf
now in run command?

I tried like:   -yD java.security.krb5.conf=./krb5.conf\

Didn't work this way. Please suggest, can file be used as relative path
./krb5.conf or what is misinterpreted?

Note: When we manually updated KRB5.conf on all cluster nodes in
/etc/KRB5.conf it was working. But I am trying to make it available as JVM
property.

Regards,
Vijay


On Thu, Aug 13, 2020 at 9:21 PM Yangze Guo  wrote:

> Hi,
>
> When deploying Flink on Yarn, you could ship krb5.conf by "--ship"
> command. Notice that this command only supports to ship folders now.
>
> Best,
> Yangze Guo
>
> On Fri, Aug 14, 2020 at 11:22 AM Vijayendra Yadav 
> wrote:
> >
> > Any inputs ?
> >
> > On Tue, Aug 11, 2020 at 10:34 AM Vijayendra Yadav 
> wrote:
> >>
> >> Dawid, I was able to resolve the keytab issue by passing the service
> name, but now I am facing the KRB5 issue.
> >>
> >> Caused by: org.apache.kafka.common.errors.SaslAuthenticationException:
> Failed to create SaslClient with mechanism GSSAPI
> >> Caused by: javax.security.sasl.SaslException: Failure to initialize
> security context [Caused by GSSException: Invalid name provided (Mechanism
> level: KrbException: Cannot locate default realm)]
> >>
> >> I passed KRB5 from yaml conf file like:
> >>
> >> env.java.opts.jobmanager: -Djava.security.krb5.conf=/path/krb5.conf
> >> env.java.opts.taskmanager: -Djava.security.krb5.conf=/path/krb5.conf
> >>
> >> How can I resolve this? Is there another way to pass KRB5?
> >>
> >> I also tried via option#1 from flink run command -D parameter.
> >>
> >> Regards,
> >> Vijay
> >>
> >>
> >> On Tue, Aug 11, 2020 at 1:26 AM Dawid Wysakowicz <
> dwysakow...@apache.org> wrote:
> >>>
> >>> Hi,
> >>>
> >>> As far as I know the approach 2) is the supported way of setting up
> Kerberos authentication in Flink. In the second approach have you tried
> setting the `sasl.kerberos.service.name` in the configuration of your
> KafkaConsumer/Producer[1]? I think this might be the issue.
> >>>
> >>> Best,
> >>>
> >>> Dawid
> >>>
> >>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#enabling-kerberos-authentication
> >>>
> >>>
> >>> On 09/08/2020 20:39, Vijayendra Yadav wrote:
> >>>
> >>> Hi Team,
> >>>
> >>> I am trying to stream data from kafkaconsumer using:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
> >>>
> >>> Here my KAFKA is Kerberos secured and SSL enabled.
> >>>
> >>> I am running my Flink streaming in yarn-cluster on EMR 5.31.
> >>>
> >>> I have tried to pass keytab/principal in following 2 Ways:
> >>>
> >>> 1) Passing as JVM property in Flink run Command.
> >>>
> >>> /usr/lib/flink/bin/flink run
> >>>-yt ${app_install_path}/conf/
>\
> 
>  -Dsecurity.kerberos.login.use-ticket-cache=false
> \
>  -yDsecurity.kerberos.login.use-ticket-cache=false
>  \
>  -Dsecurity.kerberos.login.keytab=${app_install_path}/conf/keytab  \
>  -yDsecurity.kerberos.login.keytab=${app_install_path}/conf/.keytab \
>  -Djava.security.krb5.conf=${app_install_path}/conf/krb5.conf
> \
>  -yDjava.security.krb5.conf=${app_install_path}/conf/krb5.conf
>  \
>  -Dsecurity.kerberos.login.principal=x...@xx.net \
>  -yDsecurity.kerberos.login.principal= x...@xx.net\
>  -Dsecurity.kerberos.login.contexts=Client,KafkaClient
>  \
>  -yDsecurity.kerberos.login.contexts=Client,KafkaClient
> >>>
> >>>
> >>> Here, I am getting the following Error, it seems like KEYTAB Was not
> transported to the run environment and probably not found.
> >>>
> >>> org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
> >>> Caused by: java.lang.IllegalArgumentException: Could not find a
> 'KafkaClient' entry in the JAAS configuration. System property
> 'java.security.auth.login.config'
> >>>
> >>> 2) Passing from flink config:  /usr/lib/flink/conf/flink-conf.yaml
> >>>
> >>> security.kerberos.login.use-ticket-cache: false
> >>> security.kerberos.login.keytab:  ${app_install_path}/conf/keytab
> >>> security.kerberos.login.principal:  x...@xx.net
> >>> security.kerberos.login.contexts: Client,KafkaClient
> >>>
> >>> Here, I am getting the following Error,
> >>>
> >>> org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
> >>> Caused by: org.apache.kafka.common.KafkaException:
> java.lang.IllegalArgumentException: No serviceName defined in either JAAS
> or Kafka config
> >>>
> >>>
> >>> Could you please help find, what are probable causes and resolution?
> >>>
> 

Re: [Flink-KAFKA-KEYTAB] Kafkaconsumer error Kerberos

2020-08-13 Thread Yangze Guo
Hi,

When deploying Flink on Yarn, you could ship krb5.conf by "--ship"
command. Notice that this command only supports to ship folders now.

Best,
Yangze Guo

On Fri, Aug 14, 2020 at 11:22 AM Vijayendra Yadav  wrote:
>
> Any inputs ?
>
> On Tue, Aug 11, 2020 at 10:34 AM Vijayendra Yadav  
> wrote:
>>
>> Dawid, I was able to resolve the keytab issue by passing the service name, 
>> but now I am facing the KRB5 issue.
>>
>> Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: 
>> Failed to create SaslClient with mechanism GSSAPI
>> Caused by: javax.security.sasl.SaslException: Failure to initialize security 
>> context [Caused by GSSException: Invalid name provided (Mechanism level: 
>> KrbException: Cannot locate default realm)]
>>
>> I passed KRB5 from yaml conf file like:
>>
>> env.java.opts.jobmanager: -Djava.security.krb5.conf=/path/krb5.conf
>> env.java.opts.taskmanager: -Djava.security.krb5.conf=/path/krb5.conf
>>
>> How can I resolve this? Is there another way to pass KRB5?
>>
>> I also tried via option#1 from flink run command -D parameter.
>>
>> Regards,
>> Vijay
>>
>>
>> On Tue, Aug 11, 2020 at 1:26 AM Dawid Wysakowicz  
>> wrote:
>>>
>>> Hi,
>>>
>>> As far as I know the approach 2) is the supported way of setting up 
>>> Kerberos authentication in Flink. In the second approach have you tried 
>>> setting the `sasl.kerberos.service.name` in the configuration of your 
>>> KafkaConsumer/Producer[1]? I think this might be the issue.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#enabling-kerberos-authentication
>>>
>>>
>>> On 09/08/2020 20:39, Vijayendra Yadav wrote:
>>>
>>> Hi Team,
>>>
>>> I am trying to stream data from kafkaconsumer using: 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>
>>> Here my KAFKA is Kerberos secured and SSL enabled.
>>>
>>> I am running my Flink streaming in yarn-cluster on EMR 5.31.
>>>
>>> I have tried to pass keytab/principal in following 2 Ways:
>>>
>>> 1) Passing as JVM property in Flink run Command.
>>>
>>> /usr/lib/flink/bin/flink run
>>>-yt ${app_install_path}/conf/
>>>  \

 -Dsecurity.kerberos.login.use-ticket-cache=false   
\
 -yDsecurity.kerberos.login.use-ticket-cache=false  
\
 -Dsecurity.kerberos.login.keytab=${app_install_path}/conf/keytab  \
 -yDsecurity.kerberos.login.keytab=${app_install_path}/conf/.keytab \
 -Djava.security.krb5.conf=${app_install_path}/conf/krb5.conf   
\
 -yDjava.security.krb5.conf=${app_install_path}/conf/krb5.conf  
\
 -Dsecurity.kerberos.login.principal=x...@xx.net \
 -yDsecurity.kerberos.login.principal= x...@xx.net\
 -Dsecurity.kerberos.login.contexts=Client,KafkaClient  
\
 -yDsecurity.kerberos.login.contexts=Client,KafkaClient
>>>
>>>
>>> Here, I am getting the following Error, it seems like KEYTAB Was not 
>>> transported to the run environment and probably not found.
>>>
>>> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>>> Caused by: java.lang.IllegalArgumentException: Could not find a 
>>> 'KafkaClient' entry in the JAAS configuration. System property 
>>> 'java.security.auth.login.config'
>>>
>>> 2) Passing from flink config:  /usr/lib/flink/conf/flink-conf.yaml
>>>
>>> security.kerberos.login.use-ticket-cache: false
>>> security.kerberos.login.keytab:  ${app_install_path}/conf/keytab
>>> security.kerberos.login.principal:  x...@xx.net
>>> security.kerberos.login.contexts: Client,KafkaClient
>>>
>>> Here, I am getting the following Error,
>>>
>>> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>>> Caused by: org.apache.kafka.common.KafkaException: 
>>> java.lang.IllegalArgumentException: No serviceName defined in either JAAS 
>>> or Kafka config
>>>
>>>
>>> Could you please help find, what are probable causes and resolution?
>>>
>>> Regards,
>>> Vijay
>>>


Re: Avro format in pyFlink

2020-08-13 Thread Xingbo Huang
Hi Rodrigo,
For the connectors, Pyflink just wraps the java implementation.
And I am not an expert on Avro and corresponding connectors, but as far as
I know, DataTypes really cannot declare the type of union you mentioned.
Regarding the bytes encoding you mentioned, I actually have no good
suggestions.
I think we need a Avro expert to answer your question.

Best,
Xingbo

rodrigobrochado  于2020年8月14日周五 上午10:07写道:

>
> The upload of the schema through Avro(avro_schema) worked, but I had to
> select one type from the union type to put in Schema.field(field_type)
> inside t_env.connect(). If my dict has long and double values, and I
> declare
> Schema.field(DataTypes.Double()), all the int values are cast to double. My
> maps will also have string values and the job will crash using this
> configuration.
>
> Is there any workaround? If not, I thought of serializing it on the UDTF
> using the python avro lib and sending it as bytes to the sink. The problem
> is that all serialization formats change the original schema: the CSV
> format
> use the base64 encoding for bytes; the JSON format adds a key, to form a
> key/value pair, where the value will the binary; and the Avro format adds 3
> bytes at the beginning of the message.
>
> Thanks,
> Rodrigo
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [Flink-KAFKA-KEYTAB] Kafkaconsumer error Kerberos

2020-08-13 Thread Vijayendra Yadav
Any inputs ?

On Tue, Aug 11, 2020 at 10:34 AM Vijayendra Yadav 
wrote:

> Dawid, I was able to resolve the keytab issue by passing the service name,
> but now I am facing the KRB5 issue.
>
> Caused by: org.apache.kafka.common.errors.SaslAuthenticationException:
> Failed to create SaslClient with mechanism GSSAPI
> Caused by: javax.security.sasl.SaslException: Failure to initialize
> security context [Caused by GSSException: Invalid name provided (Mechanism
> level: KrbException: Cannot locate default realm)]
>
> I passed KRB5 from yaml conf file like:
>
> env.java.opts.jobmanager: -Djava.security.krb5.conf=/path/krb5.conf
> env.java.opts.taskmanager: -Djava.security.krb5.conf=/path/krb5.conf
>
> How can I resolve this? Is there another way to pass KRB5?
>
> I also tried via option#1 from flink run command -D parameter.
>
> Regards,
> Vijay
>
>
> On Tue, Aug 11, 2020 at 1:26 AM Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> As far as I know the approach 2) is the supported way of setting up
>> Kerberos authentication in Flink. In the second approach have you tried
>> setting the `sasl.kerberos.service.name` in the configuration of your
>> KafkaConsumer/Producer[1]? I think this might be the issue.
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#enabling-kerberos-authentication
>>
>>
>> On 09/08/2020 20:39, Vijayendra Yadav wrote:
>>
>> Hi Team,
>>
>> I am trying to stream data from kafkaconsumer using:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>
>> Here my KAFKA is Kerberos secured and SSL enabled.
>>
>> I am running my Flink streaming in yarn-cluster on EMR 5.31.
>>
>> I have tried to pass keytab/principal in following *2 Ways*:
>>
>> 1) Passing as JVM property in Flink run Command.
>>
>> /usr/lib/flink/bin/flink run
>>-yt ${app_install_path}/conf/
>> \
>>
>>> -Dsecurity.kerberos.login.use-ticket-cache=false
>>>  \
>>> -yDsecurity.kerberos.login.use-ticket-cache=false
>>>   \
>>> -Dsecurity.kerberos.login.keytab=${app_install_path}/conf/keytab  \
>>> -yDsecurity.kerberos.login.keytab=${app_install_path}/conf/.keytab \
>>> -Djava.security.krb5.conf=${app_install_path}/conf/krb5.conf
>>>  \
>>> -yDjava.security.krb5.conf=${app_install_path}/conf/krb5.conf
>>>   \
>>> -Dsecurity.kerberos.login.principal=x...@xx.net \
>>> -yDsecurity.kerberos.login.principal= x...@xx.net\
>>> -Dsecurity.kerberos.login.contexts=Client,KafkaClient
>>>   \
>>> -yDsecurity.kerberos.login.contexts=Client,KafkaClient
>>>
>>
>> *Here, I am getting the following Error, it seems like KEYTAB Was not
>> transported to the run environment and probably not found.*
>>
>>
>> *org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer Caused by: java.lang.IllegalArgumentException: Could not find a
>> 'KafkaClient' entry in the JAAS configuration. System property
>> 'java.security.auth.login.config'*
>>
>> 2) Passing from flink config:  * /usr/lib/flink/conf/flink-conf.yaml*
>>
>> security.kerberos.login.use-ticket-cache: false
>> security.kerberos.login.keytab:  ${app_install_path}/conf/keytab
>> security.kerberos.login.principal:  x...@xx.net
>> security.kerberos.login.contexts: Client,KafkaClient
>>
>> *Here, I am getting the following Error, *
>>
>>
>> *org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer Caused by: org.apache.kafka.common.KafkaException:
>> java.lang.IllegalArgumentException: No serviceName defined in either JAAS
>> or Kafka config*
>>
>>
>> Could you please help find, what are probable causes and resolution?
>>
>> Regards,
>> Vijay
>>
>>


Re: k8s job cluster using StatefulSet

2020-08-13 Thread Yang Wang
Hi Alexey,

Actually, StatefulSets could also be used to start the JobManager and
TaskManager.

So why do we suggest to use Deployment in the Flink documentation?
* StatefulSets requires the user to have persistent volume in the K8s
cluster. However, it is not always true,
  especially for the unmanaged(self-build) K8s cluster.
* Flink uses Zookeeper and distributed storage(S3, GFS, etc.) to process
the fault tolerance. If you start multiple
  JobManagers, the leader election and leader retrieval will be done via
Zookeeper. Also the meta information will
   be stored in the Zookeeper. So it is unnecessary to use StatefulSet to
do more things.
* The local data of JobManager and TaskManager is ephemeral. It could be
discarded after crashed.


Best,
Yang




Arvid Heise  于2020年8月13日周四 下午4:38写道:

> Hi Alexey,
>
> I don't see any issue in using stateful sets immediately.
>
> I'd recommend using one of the K8s operators or Ververica's community
> edition [1] though if you start with a new setup as they may solve even
> more issues that you might experience in the future.
>
> [1] https://www.ververica.com/getting-started
>
> On Mon, Aug 10, 2020 at 11:22 PM Alexey Trenikhun  wrote:
>
>> Hello,
>> Flink documentation suggests to use Deployments to deploy JM and TM for
>> kubernetes job cluster. Is any known potential issues with using
>> StatefulSets instead, seems StatefullSet provides uniqueness for JM during
>> upgrade/rollback, while with Deployments could be multiple JM pods (e.g.1
>> terminating and 1 running)
>>
>> Thanks,
>> Alexey
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

2020-08-13 Thread Yang Wang
Hi kevin,

Thanks for sharing more information. You are right. Actually, "too old
resource version" is caused by a bug
of fabric8 kubernetes-client[1]. It has been fix in v4.6.1. And we have
bumped the kubernetes-client version
to v4.9.2 in Flink release-1.11. Also it has been backported to release
1.10 and will be included in the next
minor release version(1.10.2).

BTW, if you really want all your jobs recovered when jobmanager crashed,
you still need to configure the Zookeeper high availability.

[1]. https://github.com/fabric8io/kubernetes-client/pull/1800


Best,
Yang

Bohinski, Kevin  于2020年8月14日周五 上午6:32写道:

> Might be useful
>
> https://stackoverflow.com/a/61437982
>
>
>
> Best,
>
> kevin
>
>
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Thursday, August 13, 2020 at 6:13 PM
> *To: *Yang Wang 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job
> never recovers
>
>
>
> Hi
>
>
>
> Got the logs on crash, hopefully they help.
>
>
>
> 2020-08-13 22:00:40,336 ERROR
> org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
> error occurred in ResourceManager.
>
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 8617182 (8633230)
>
> at
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_262]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_262]
>
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
>
> 2020-08-13 22:00:40,337 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
>
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 8617182 (8633230)
>
> at
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_262]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_262]
>
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
>
> 2020-08-13 22:00:40,416 INFO
> org.apache.flink.runtime.blob.BlobServer [] - Stopped
> BLOB server at 0.0.0.0:6124
>
>
>
> Best,
>
> kevin
>
>
>
>
>
> 

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Leonard Xu
Hi, Weizheng


> 在 2020年8月13日,19:44,Danny Chan  写道:
> 
> tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job 
> to the backend cluster with a builtin job name

`tEnv.executeSql` is an asynchronous method which will submit the job 
immediately. If you’re test in your  IDE, you’d better obtain the TableResult 
object and wait for the execution as following piece of code,
otherwise your `main()` method in demo may exit before the execution finished.

TableResult result = tableEnvironment.executeSql("insert into ... ");
// wait for the insert job finished
result.getJobClient().get()
  
.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

Best
Leonard

Re: Hostname for taskmanagers when running in docker

2020-08-13 Thread Xintong Song
Hi Nikola,

I'm not entirely sure about how this happened. Would need some more
information to investigate, such as the complete configurations for
taskmanagers in your docker compose file, and the taskmanager logs.

One quick thing you may try is to explicitly set the configuration option
`taskmanager.host` for your task managers, see if that is reflected in the
metrics.

Thank you~

Xintong Song



On Wed, Aug 12, 2020 at 3:06 PM Nikola Hrusov  wrote:

> Hello,
>
> After upgrading the docker image for flink to 1.11.1 from 1.9 the hostname
> of the taskmanagers reported to our metrics show as IPs (e.g. 10.0.23.101)
> instead of hostnames.
>
> In the docker compose file we specify the hostname as such:
>
>
> *hostname: "taskmanager-{{ '{{' }}.Node.Hostname{{ '}}' }}"*
>
> Is there another way of achieving this?
>
> Regards
> ,
> Nikola Hrusov
>


Re: Avro format in pyFlink

2020-08-13 Thread rodrigobrochado


The upload of the schema through Avro(avro_schema) worked, but I had to
select one type from the union type to put in Schema.field(field_type)
inside t_env.connect(). If my dict has long and double values, and I declare
Schema.field(DataTypes.Double()), all the int values are cast to double. My
maps will also have string values and the job will crash using this
configuration. 

Is there any workaround? If not, I thought of serializing it on the UDTF
using the python avro lib and sending it as bytes to the sink. The problem
is that all serialization formats change the original schema: the CSV format
use the base64 encoding for bytes; the JSON format adds a key, to form a
key/value pair, where the value will the binary; and the Avro format adds 3
bytes at the beginning of the message.

Thanks,
Rodrigo



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

2020-08-13 Thread Bohinski, Kevin
Might be useful
https://stackoverflow.com/a/61437982

Best,
kevin


From: "Bohinski, Kevin" 
Date: Thursday, August 13, 2020 at 6:13 PM
To: Yang Wang 
Cc: "user@flink.apache.org" 
Subject: Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never 
recovers

Hi

Got the logs on crash, hopefully they help.

2020-08-13 22:00:40,336 ERROR 
org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal error 
occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource 
version: 8617182 (8633230)
at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_262]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
2020-08-13 22:00:40,337 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource 
version: 8617182 (8633230)
at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_262]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
2020-08-13 22:00:40,416 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BLOB server at 0.0.0.0:6124

Best,
kevin


From: Yang Wang 
Date: Sunday, August 9, 2020 at 10:29 PM
To: "Bohinski, Kevin" 
Cc: "user@flink.apache.org" 
Subject: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

Hi Kevin,

I think you may not set the high availability configurations in your native K8s 
session. Currently, we only
support zookeeper HA, so you need to add the following configuration. After the 
HA is configured, the
running job, checkpoint and other meta could be stored. When the jobmanager 
failover, all the jobs
could be recovered then. I have tested it could work properly.


high-availability: zookeeper

high-availability.zookeeper.quorum: zk-client:2181

high-availability.storageDir: hdfs:///flink/recovery

I know you may not have a zookeeper cluster.You could a zookeeper K8s 
operator[1] to deploy a new one.

More over, it is not very convenient to use z

Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

2020-08-13 Thread Bohinski, Kevin
Hi

Got the logs on crash, hopefully they help.

2020-08-13 22:00:40,336 ERROR 
org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal error 
occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource 
version: 8617182 (8633230)
at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_262]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
2020-08-13 22:00:40,337 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource 
version: 8617182 (8633230)
at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
 [flink-dist_2.11-1.11.1.jar:1.11.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_262]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
2020-08-13 22:00:40,416 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BLOB server at 0.0.0.0:6124

Best,
kevin


From: Yang Wang 
Date: Sunday, August 9, 2020 at 10:29 PM
To: "Bohinski, Kevin" 
Cc: "user@flink.apache.org" 
Subject: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

Hi Kevin,

I think you may not set the high availability configurations in your native K8s 
session. Currently, we only
support zookeeper HA, so you need to add the following configuration. After the 
HA is configured, the
running job, checkpoint and other meta could be stored. When the jobmanager 
failover, all the jobs
could be recovered then. I have tested it could work properly.


high-availability: zookeeper

high-availability.zookeeper.quorum: zk-client:2181

high-availability.storageDir: hdfs:///flink/recovery

I know you may not have a zookeeper cluster.You could a zookeeper K8s 
operator[1] to deploy a new one.

More over, it is not very convenient to use zookeeper as HA. So a K8s native HA 
support[2] is in plan and we
are trying to finish it in the next major release cycle(1.12).


[1]. 
https://github.com/pravega/zookeeper-operator

Performance Flink streaming kafka consumer sink to s3

2020-08-13 Thread Vijayendra Yadav
Hi Team,

I am trying to increase throughput of my flink stream job streaming from
kafka source and sink to s3. Currently it is running fine for small events
records. But records with large payloads are running extremely slow like at
rate 2 TPS.

Could you provide some best practices to tune?
Also, can we increase parallel processing, beyond the number of
kafka partitions that we have, without causing any overhead ?

Regards,
Vijay


Re: What async database library does the asyncio code example use?

2020-08-13 Thread Marco Villalobos
Thank you!

This was very helpful.

Sincerely, 

Marco A. Villalobos

> On Aug 13, 2020, at 1:24 PM, Arvid Heise  wrote:
> 
> Hi Marco,
> 
> you don't need to use an async library; you could simply write your code in 
> async fashion.
> 
> I'm trying to sketch the basic idea using any JDBC driver in the following 
> (it's been a while since I used JDBC, so don't take it too literally).
> 
> private static class SampleAsyncFunction extends RichAsyncFunction String> {
>private transient ExecutorService executorService;
>private transient Connection dbConn;
>private transient PreparedStatement preparedStatement;
> 
>SampleAsyncFunction() {
>   this. = ;
>}
> 
>@Override
>public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   executorService = Executors.newFixedThreadPool(30);
>   dbConn = DriverManager.getConnection( < connection info >);
>   preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ...");
>}
> 
>@Override
>public void close() throws Exception {
>   super.close();
>   executorService.shutdownNow();
>   preparedStatement.close();
>   dbConn.close();
>}
> 
>@Override
>public void asyncInvoke(final Integer input, final ResultFuture 
> resultFuture) {
>   executorService.submit(() -> {
>  try {
> preparedStatement.setInt(0, input);
> final ResultSet resultSet = preparedStatement.executeQuery();
> 
> resultFuture.complete(Arrays.asList(resultSet.getString(0)));
>  } catch (SQLException e) {
> resultFuture.completeExceptionally(e);
>  }
>   });
>}
> }
> That's basically what all async libraries are doing behind the scenes 
> anyways: spawn a thread pool and call the callbacks when a submitted task 
> finishes.
> 
> To decide on the size of the thread pool, you should do some measurements 
> without Flink on how many queries you can execute in parallel. Also keep in 
> mind that if your async IO is run in parallel on the same task manager, that 
> your threads will multiply (you can also use a static, shared executor, but 
> it's a bit tricky to shutdown).
> 
> On Wed, Aug 12, 2020 at 8:16 PM KristoffSC  > wrote:
> Hi,
> I do believe that example from [1] where you see DatabaseClient is just a
> hint that whatever library you would use (db or REST based or whatever else)
> should be asynchronous or should actually not block. It does not have to be
> non blocking until it runs on its own thread pool that will return a feature
> or somewhat allowing you to register resultFuture.complete(...) on that
> future.
> 
> I actually write my own semi library that registers onto
> resultFuture.complete(...) from each library thread.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>  
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 
> 
> -- 
> Arvid Heise | Senior Java Developer
>  
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> (Toni) Cheng



Re: What async database library does the asyncio code example use?

2020-08-13 Thread Arvid Heise
Hi Marco,

you don't need to use an async library; you could simply write your code in
async fashion.

I'm trying to sketch the basic idea using any JDBC driver in the following
(it's been a while since I used JDBC, so don't take it too literally).

private static class SampleAsyncFunction extends
RichAsyncFunction {
   private transient ExecutorService executorService;
   private transient Connection dbConn;
   private transient PreparedStatement preparedStatement;

   SampleAsyncFunction() {
  this. = ;
   }

   @Override
   public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  executorService = Executors.newFixedThreadPool(30);
  dbConn = DriverManager.getConnection( < connection info >);
  preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ...");
   }

   @Override
   public void close() throws Exception {
  super.close();
  executorService.shutdownNow();
  preparedStatement.close();
  dbConn.close();
   }

   @Override
   public void asyncInvoke(final Integer input, final
ResultFuture resultFuture) {
  executorService.submit(() -> {
 try {
preparedStatement.setInt(0, input);
final ResultSet resultSet = preparedStatement.executeQuery();

resultFuture.complete(Arrays.asList(resultSet.getString(0)));
 } catch (SQLException e) {
resultFuture.completeExceptionally(e);
 }
  });
   }
}

That's basically what all async libraries are doing behind the scenes
anyways: spawn a thread pool and call the callbacks when a submitted task
finishes.

To decide on the size of the thread pool, you should do some measurements
without Flink on how many queries you can execute in parallel. Also keep in
mind that if your async IO is run in parallel on the same task manager,
that your threads will multiply (you can also use a static, shared
executor, but it's a bit tricky to shutdown).

On Wed, Aug 12, 2020 at 8:16 PM KristoffSC 
wrote:

> Hi,
> I do believe that example from [1] where you see DatabaseClient is just a
> hint that whatever library you would use (db or REST based or whatever
> else)
> should be asynchronous or should actually not block. It does not have to be
> non blocking until it runs on its own thread pool that will return a
> feature
> or somewhat allowing you to register resultFuture.complete(...) on that
> future.
>
> I actually write my own semi library that registers onto
> resultFuture.complete(...) from each library thread.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink Parquet Streaming FileSink with scala case class with optional fields error

2020-08-13 Thread Arvid Heise
Hi Vikash,

The error is coming from Parquet itself in conjunction with Avro (which is
used to infer the schema of your scala class). The inferred schema is

{
"fields": [
{
"name": "level",
"type": "string"
},
{
"name": "time_stamp",
"type": {
"fields": [],
"name": "Option",
"namespace": "scala",
"type": "record"
}
}
],
"name": "Log",
"namespace": "org.apache.flink.formats.parquet.avro",
"type": "record"
}

As you can see, Avro infers your schema, such that Option is treated as an
arbitrary class. Since it doesn't have any fields, you receive your error
from Parquet though.

I don't see an easy fix for it, but you can probably search for solutions
with Avro's ReflectData and scala.Option. As a workaround, you can refrain
from using an Option field, and go with a nullable field (you can translate
it into Option with a fancy getter).

In general, if you want to have more control over the schema, I'd suggest
to go schema first: Define your Avro schema and use avro-hugger to generate
the corresponding Scala class. In that way, Option is properly supported.

Best,

Arvid



On Wed, Aug 12, 2020 at 2:43 AM Vikash Dat  wrote:

> I have defined a streaming file sink for parquet to store my scala case
> class.
>
> StreamingFileSink
>
>   .*forBulkFormat(*
>
> new Path*(*appArgs.datalakeBucket*)*,
>
> ParquetAvroWriters
>
>   .*forReflectRecord(classOf[*Log*])*
>
> *  )*
>
>   .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())*
>
>   .build*()*
>
>
> where my class class is
>
> Log(
>
>level: String,
>
> time_stamp: Option[Long] = None
>
> )
>
>
> When Flink tries to write a specific instance to parquet
>
>
> Log("info",Some(159697595))
>
>
> it throws the following error:
>
>
> org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema
> with an empty group: required group time_stamp {
> }
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
> at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
> at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
> at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil
> .java:23)
> at org.apache.parquet.hadoop.ParquetFileWriter.(
> ParquetFileWriter.java:280)
> at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:
> 283)
> at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
> .java:564)
> at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .createAvroParquetWriter(ParquetAvroWriters.java:87)
> at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
> at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
> ParquetWriterFactory.java:57)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .rollPartFile(Bucket.java:222)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:212)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:274)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.invoke(StreamingFileSink.java:445)
> at org.apache.flink.streaming.api.operators.StreamSink.processElement(
> StreamSink.java:56)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 730)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 708)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
> at org.apache

[Announce] Flink Forward Global Program is now Live

2020-08-13 Thread Seth Wiesman
Hi Everyone



*The Flink Forward Global 2020 program is now online* and with 2 full days
of exciting Apache Flink content, curated by our program committee[1]! Join
us on October 21-22 to learn more about the newest technology updates, and
hear use cases from Intel, Razorpay, Workday, Microsoft, and other
industry-leading companies.



Here are some program highlights:

   - Fault Tolerance 2.0 for Apache Flink
   - Building an End-to-End Analytics Pipeline with PyFlink
   - Building Unified Streaming Platform at Uber
   - Flink powers Houzz Realtime Pipeline and Applications
   - Tale of Stateful Stream to Stream Processing


The conference is free to attend, and tickets are online[2].




We are also offering two days of hands-on training sessions on October
19-20[3]:

   - 2-part Apache Flink Developer Training
   - 2-part SQL Developer Training
   - Stateful Functions Training
   - Runtime & Operations Training
   - Introduction to Tuning & Troubleshooting
   - Advanced Tuning & Troubleshooting


We look forward to connecting with the global Flink community in just 2
months!


[1] https://www.flink-forward.org/global-2020/conference-program

[2]
https://www.eventbrite.com/e/flink-forward-global-virtual-2020-tickets-113775477516

[3] https://www.flink-forward.org/global-2020/training-program


Seth Wiesman

Flink Forward Global Program Committee Chair

Committer Apache Flink


Re: Status of a job when a kafka source dies

2020-08-13 Thread Nick Bendtner
Hi Piotr,
Sorry for the late reply. So the poll does not throw an exception when a
broker goes down. In spring they solve it by generating an event [1]
whenever this happens and you can intercept this event, consumer.timeout.ms
helps to some extent does help but if the source topic does not receive any
messages for the specified value then it still throws an exception.

Best,
Nick.


[1]
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/event/NonResponsiveConsumerEvent.html

On Wed, Aug 5, 2020 at 1:30 PM Piotr Nowojski  wrote:

> Hi Nick,
>
> Could you elaborate more, what event and how would you like Flink to
> handle? Is there some kind of Kafka's API that can be used to listen to
> such kind of events? Becket, do you maybe know something about this?
>
> As a side note Nick, can not you configure some timeouts [1] in the
> KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as
> I wrote before, that would be more a question to Kafka guys.
>
> Piotrek
>
> [1] http://kafka.apache.org/20/documentation/
>
> śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):
>
>> +user group.
>>
>> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:
>>
>>> Thanks Piotr but shouldn't this event be handled by the
>>> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
>>> How can I catch this event in my code since I don't have control over the
>>> poll.
>>>
>>> Best,
>>> Nick.
>>>
>>> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi Nick,

 What Aljoscha was trying to say is that Flink is not trying to do any
 magic. If `KafkaConsumer` - which is being used under the hood of
 `FlinkKafkaConsumer` connector - throws an exception, this
 exception bubbles up causing the job to failover. If the failure is handled
 by the `KafkaConsumer` silently, that's what's happening. As we can in the
 TM log that you attached, the latter seems to be happening - note that the
 warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
 that's not the code we (Flink developers) control.

 If you want to change this behaviour, unless someone here on this
 mailing list just happens to know the answer, the better place to ask such
 a question on the Kafka mailing list. Maybe there is some way to configure
 this.

 And sorry I don't know much about neither the KafkaConsumer nor the
 KafkaBrokers configuration :(

 Piotrek

 wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):

> Hi,
> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
> kafka and zookeeper on all broker nodes. On the flink side, I see the
> messages in the log ( data is obfuscated) . There are no error logs. The
> kafka consumer properties are
>
> 1. "bootstrap.servers"
>
> 2. "zookeeper.connect
>
> 3. "auto.offset.reset"
>
> 4. "group.id"
>
> 5."security.protocol"
>
>
> The flink consumer starts consuming data as soon as the kafka comes
> back up. So I want to know in what scenario/kafka consumer config will the
> job go to failed state after a finite number of restart attempts from
> checkpoint.
>
>
> TM log.
> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-5,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
> Broker may not be available.
> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
> Broker may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-4,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
> Broker may not be available.
> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>  - [Consumer clientId=consumer-6,
> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
> Broker may not be available.
>
> Best,
> Nick
>
> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Flink doesn't do any special failure-handling or retry logic, so it’s
>> up
>> to how the KafkaConsumer is configured via properties. In general
>> Flink
>> doesn’t try to be smart: when something fails an exception fill
>> bubble
>> up that will fail this execution of the job. If checkpoints are
>> 

Re: Client's documentation for deploy and run remotely.

2020-08-13 Thread Jacek Grzebyta
It seems the documentation might be outdated. Probably I found what I
wanted in different request:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Submit-Flink-1-11-job-from-java-td37245.html

Cheers,
Jacek

On Thu, 13 Aug 2020 at 14:23, Jacek Grzebyta  wrote:

> Hi,
>
> I have a problem with some examples in the documentation. Particularly I
> meant about that paragraph:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/parallel.html#client-level
>
> In the code there are used classes such as: Client and RemoteExecutor. I
> found those classes in the Flink ver 1.11 but they have very different
> usage and methods` signatures. It seems the page/paragraph is very
> outdated, alternatively the example is accurate but there were not added
> imports so I do not use correct modules/packages.
>
> Basically my problem I need to solve is to deploy a jar into a flink
> cluster and run a program remotely. I hope to use flink-clients_2.11 to
> solve this problem but with no success. This code is a part of a web
> service so it should be done programmatically. Otherwise I can do direct
> requests to the Monitoring REST API. But I prefer not to.
>
> Is it possible to find some better code examples for flink-client?
>
> Regards,
> Jacek
>
>
>
>
>


Re: Flink cluster deployment strategy

2020-08-13 Thread sidhant gupta
Thanks, I will check it out.

On Thu, 13 Aug, 2020, 7:55 PM Arvid Heise,  wrote:

> Hi Sidhant,
>
> If you are starting fresh with Flink, I strongly recommend to skip ECS and
> EMR and directly go to a kubernetes-based solution. Scaling is much easier
> on K8s, there will be some kind of autoscaling coming in the next release,
> and the best of it all: you even have the option to go to a different cloud
> provider if needed.
>
> The easiest option for you is to use EKS on AWS together with Ververica
> community edition [1] or with one of the many kubernetes operators.
>
> [1] https://www.ververica.com/getting-started
>
> On Tue, Aug 11, 2020 at 3:23 PM Till Rohrmann 
> wrote:
>
>> Hi Sidhant,
>>
>> see the inline comments for answers
>>
>> On Tue, Aug 11, 2020 at 3:10 PM sidhant gupta 
>> wrote:
>>
>>> Hi Till,
>>>
>>> Thanks for your response.
>>> I have few queries though as mentioned below:
>>> (1) Can flink be used in map-reduce fashion with data streaming api ?
>>>
>>
>> What do you understand as map-reduce fashion? You can use Flink's DataSet
>> API for processing batch workloads (consisting not only of map and reduce
>> operations but also other operations such as groupReduce, flatMap, etc.).
>> Flink's DataStream API can be used to process bounded and unbounded
>> streaming data.
>>
>> (2) Does it make sense to use aws EMR if we are not using flink in
>>> map-reduce fashion with streaming api ?
>>>
>>
>> I think I don't fully understand what you mean with map-reduce fashion.
>> Do you mean multiple stages of map and reduce operations?
>>
>>
>>> (3) Can flink cluster be auto scaled using EMR Managed Scaling when used
>>> with yarn as per this link
>>> https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-managed-scaling-automatically-resize-clusters-to-lower-cost/
>>>  ?
>>>
>>
>> I am no expert on EMR managed scaling but I believe that it would need
>> some custom tooling to scale a Flink job down (by taking a savepoint a
>> resuming from it with a lower parallelism) before downsizing the EMR
>> cluster.
>>
>>
>>> (4) If we set an explicit max parallelism, and set current parallelism
>>> (which might be less than the max parallelism) equal to the maximum number
>>> of slots and set slots per task manager while starting the yarn session,
>>> then if we increase the task manager as per auto scaling then does the
>>> parallelism would increase (till the max parallelism ) and the load would
>>> be distributed across the newly spined up task manager ? Refer:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/production_ready.html#set-an-explicit-max-parallelism
>>>
>>>
>>
>> At the moment, Flink does not support this out of the box but the
>> community is working on this feature.
>>
>>>
>>> Regards
>>> Sidhant Gupta
>>>
>>> On Tue, 11 Aug, 2020, 5:19 PM Till Rohrmann, 
>>> wrote:
>>>
 Hi Sidhant,

 I am not an expert on AWS services but I believe that EMR might be a
 bit easier to start with since AWS EMR comes with Flink support out of the
 box [1]. On ECS I believe that you would have to set up the containers
 yourself. Another interesting deployment option could be to use Flink's
 native Kubernetes integration [2] which would work on AWS EKS.

 [1]
 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-create-cluster.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html

 Cheers,
 Till

 On Tue, Aug 11, 2020 at 9:16 AM sidhant gupta 
 wrote:

> Hi all,
>
> I'm kind of new to flink cluster deployment. I wanted to know which
> flink
> cluster deployment and which job mode in aws is better in terms of
> ease of
> deployment, maintenance, HA, cost, etc. As of now I am considering aws
> EMR
> vs ECS (docker containers). We have a usecase of setting up a data
> streaming api which reads records from a Kafka topic, process it and
> then
> write to a another Kafka topic. Please let me know your thoughts on
> this.
>
> Thanks
> Sidhant Gupta
>

>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Question about ParameterTool

2020-08-13 Thread Arvid Heise
Since Picocli does not have any dependencies on its own, it's safe to use.
It's a bit quirky to use with Scala, but it's imho the best CLI library for
java.

The only downside as Chesnay mentioned is the increased jar size. Also note
that Flink is not graal-ready.

Best,

Arvid


On Wed, Aug 12, 2020 at 12:27 AM Chesnay Schepler 
wrote:

> The benefit of the ParameterTool is that you do not increase your
> dependency footprint by using it.
>
> When using another CLI library you will generally package it within your
> user-jar, which may or may not increase the risk of dependency conflicts.
> Whether, and how large this risk is, depends naturally on the library.
> This also results in a larger jar file, which may or may not be relevant
> for you.
>
> On 11/08/2020 23:35, Marco Villalobos wrote:
>
> Thank you for the clarification.
>
> But does it offer any additional benefits that are not clearly documented?
>
>
>
> On Tue, Aug 11, 2020 at 12:22 PM Robert Metzger 
> wrote:
>
>> Hi,
>> there are absolutely no dangers not using ParameterTool.
>> It is used by the Flink examples, and as a showcase for global job
>> parameters:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/best_practices.html#register-the-parameters-globally
>>
>> On Tue, Aug 11, 2020 at 7:13 PM Marco Villalobos <
>> mvillalo...@kineteque.com> wrote:
>>
>>> What are the dangers of not using the ParameterTool for parsing command
>>> line parameters?
>>>
>>> I have been using Picocli (https://picocli.info/). Will this be a
>>> mistake? Are there any side-effects that I should be aware of?
>>>
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink cluster deployment strategy

2020-08-13 Thread Arvid Heise
Hi Sidhant,

If you are starting fresh with Flink, I strongly recommend to skip ECS and
EMR and directly go to a kubernetes-based solution. Scaling is much easier
on K8s, there will be some kind of autoscaling coming in the next release,
and the best of it all: you even have the option to go to a different cloud
provider if needed.

The easiest option for you is to use EKS on AWS together with Ververica
community edition [1] or with one of the many kubernetes operators.

[1] https://www.ververica.com/getting-started

On Tue, Aug 11, 2020 at 3:23 PM Till Rohrmann  wrote:

> Hi Sidhant,
>
> see the inline comments for answers
>
> On Tue, Aug 11, 2020 at 3:10 PM sidhant gupta  wrote:
>
>> Hi Till,
>>
>> Thanks for your response.
>> I have few queries though as mentioned below:
>> (1) Can flink be used in map-reduce fashion with data streaming api ?
>>
>
> What do you understand as map-reduce fashion? You can use Flink's DataSet
> API for processing batch workloads (consisting not only of map and reduce
> operations but also other operations such as groupReduce, flatMap, etc.).
> Flink's DataStream API can be used to process bounded and unbounded
> streaming data.
>
> (2) Does it make sense to use aws EMR if we are not using flink in
>> map-reduce fashion with streaming api ?
>>
>
> I think I don't fully understand what you mean with map-reduce fashion. Do
> you mean multiple stages of map and reduce operations?
>
>
>> (3) Can flink cluster be auto scaled using EMR Managed Scaling when used
>> with yarn as per this link
>> https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-managed-scaling-automatically-resize-clusters-to-lower-cost/
>>  ?
>>
>
> I am no expert on EMR managed scaling but I believe that it would need
> some custom tooling to scale a Flink job down (by taking a savepoint a
> resuming from it with a lower parallelism) before downsizing the EMR
> cluster.
>
>
>> (4) If we set an explicit max parallelism, and set current parallelism
>> (which might be less than the max parallelism) equal to the maximum number
>> of slots and set slots per task manager while starting the yarn session,
>> then if we increase the task manager as per auto scaling then does the
>> parallelism would increase (till the max parallelism ) and the load would
>> be distributed across the newly spined up task manager ? Refer:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/production_ready.html#set-an-explicit-max-parallelism
>>
>>
>
> At the moment, Flink does not support this out of the box but the
> community is working on this feature.
>
>>
>> Regards
>> Sidhant Gupta
>>
>> On Tue, 11 Aug, 2020, 5:19 PM Till Rohrmann, 
>> wrote:
>>
>>> Hi Sidhant,
>>>
>>> I am not an expert on AWS services but I believe that EMR might be a bit
>>> easier to start with since AWS EMR comes with Flink support out of the box
>>> [1]. On ECS I believe that you would have to set up the containers
>>> yourself. Another interesting deployment option could be to use Flink's
>>> native Kubernetes integration [2] which would work on AWS EKS.
>>>
>>> [1]
>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-create-cluster.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 11, 2020 at 9:16 AM sidhant gupta 
>>> wrote:
>>>
 Hi all,

 I'm kind of new to flink cluster deployment. I wanted to know which
 flink
 cluster deployment and which job mode in aws is better in terms of ease
 of
 deployment, maintenance, HA, cost, etc. As of now I am considering aws
 EMR
 vs ECS (docker containers). We have a usecase of setting up a data
 streaming api which reads records from a Kafka topic, process it and
 then
 write to a another Kafka topic. Please let me know your thoughts on
 this.

 Thanks
 Sidhant Gupta

>>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink job percentage

2020-08-13 Thread Arvid Heise
Hi Flavio,

This is a daunting task to implement properly. There is an easy fix in
related workflow systems though. Assuming that it's a rerunning task, then
you simply store the run times of the last run, use some kind of low-pass
filter (=decaying average) and compare the current runtime with the
expected runtime. Even if Flink would have some estimation, it's probably
not more accurate than this.

Best,

Arvid

On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger  wrote:

> Hi Flavio,
>
> I'm not aware of such a heuristic being implemented anywhere. You need to
> come up with something yourself.
>
> On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> one of our customers asked us to see a percentage of completion of a
>> Flink Batch job. Is there any already implemented heuristic I can use to
>> compute it? Will this be possible also when DataSet api will migrate to
>> DataStream..?
>>
>> Thanks in advance,
>> Flavio
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Client's documentation for deploy and run remotely.

2020-08-13 Thread Jacek Grzebyta
Hi,

I have a problem with some examples in the documentation. Particularly I
meant about that paragraph:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/parallel.html#client-level

In the code there are used classes such as: Client and RemoteExecutor. I
found those classes in the Flink ver 1.11 but they have very different
usage and methods` signatures. It seems the page/paragraph is very
outdated, alternatively the example is accurate but there were not added
imports so I do not use correct modules/packages.

Basically my problem I need to solve is to deploy a jar into a flink
cluster and run a program remotely. I hope to use flink-clients_2.11 to
solve this problem but with no success. This code is a part of a web
service so it should be done programmatically. Otherwise I can do direct
requests to the Monitoring REST API. But I prefer not to.

Is it possible to find some better code examples for flink-client?

Regards,
Jacek


Re: Using managed keyed state with AsynIo

2020-08-13 Thread KristoffSC
Hi Arvid,
thank you for the respond. 
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)

I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfortunately stet is not sported here.

Thank you for the proposition
source -> keyby -> map (retrieve state) -> async IO (use state) -> map
(update state)

However I'm a little bit surprised. I thought that state on a keyed cannot
be shared between operators, and here you are suggesting doing that. Is it
possible then?


Using this occasion I have additional question, Is there any difference from
Flink perspective between this two approaches:

MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
stateless object, but it uses Flink keyed state.

Setup 1:

source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink

Setup 2:
source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
process(new MyProcessFunction()) -> sink 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Danny Chan
Weighing ~

tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to 
the backend cluster with a builtin job name, the tEnv.executeSql itself did 
return a JobResult immediately with a constant affected rows count -1.

Best,
Danny Chan
在 2020年8月13日 +0800 PM3:46,Lu Weizheng ,写道:
> Thanks Timo,
>
> So no need to use execute() method in Flink SQL If I do all the thins from 
> source to sink in SQL.
>
> Best Regards,
> Lu
>
> > 2020年8月13日 下午3:41,Timo Walther  写道:
> >
> > Hi Lu,
> >
> > `env.execute("table api");` is not necessary after FLIP-84 [1]. Every 
> > method that has `execute` in its name will immediately execute a job. 
> > Therefore your `env.execute` has an empty pipeline.
> >
> > Regards,
> > Timo
> >
> > [1] 
> > https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >
> > On 13.08.20 09:34, Lu Weizheng wrote:
> > > Hi,
> > > I am using Flink 1.11 SQL using java. All my operations are in SQL. I 
> > > create source tables and insert result into sink tables. No other Java 
> > > operators. I execute it in Intellij. I can get the final result in the 
> > > sink tables. However I get the following error. I am not sure it is a bug 
> > > or there is something wrong in my code? Acutally it does not affect the 
> > > computation.
> > > /Exception in thread "main" java.lang.IllegalStateException: No operators 
> > > defined in streaming topology. Cannot execute./
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
> > > /at 
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
> > > /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
> > > Here's my code:
> > > EnvironmentSettings fsSettings = 
> > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > > StreamExecutionEnvironment env = 
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> > > fsSettings);
> > > // create source and sink tables...
> > > tEnv.executeSql("INSERT INTO sensor_1min_avg " +
> > > "SELECT " +
> > > " room, " +
> > > " AVG(temp) AS avg_temp," +
> > > " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
> > > "FROM sensor " +
> > > "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
> > > env.execute("table api");
> >
>


Re: TaskManagers are still up even after job execution completed in PerJob deployment mode

2020-08-13 Thread Till Rohrmann
Hi Narasimha,

if you are deploying the Flink cluster manually on K8s then there is
no automatic way of stopping the TaskExecutor/TaskManager pods. This is
something you have to do manually (similar to a standalone deployment). The
only clean up mechanism is the automatic termination of the TaskManager
processes if they cannot connect to the ResourceManager after the specified
timeout. However, you can use Flink's native K8s integration with which you
can also deploy a per-job mode cluster [1]. The native K8s integration is
able to clean up the whole cluster.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html

Cheers,
Till

On Thu, Aug 13, 2020 at 11:26 AM Kostas Kloudas  wrote:

> Hi Narasimha,
>
> I am not sure why the TMs are not shutting down, as Yun said, so I am
> cc'ing Till here as he may be able to shed some light.
> For the application mode, the page in the documentation that you
> pointed is the recommended way to deploy an application in application
> mode.
>
> Cheers,
> Kostas
>
> On Mon, Aug 10, 2020 at 11:16 AM narasimha  wrote:
> >
> > Thanks, Yun for the prompt reply.
> >
> > TaskManager was actively looking for ResourceManager, on timeout of 5
> mins it got terminated.
> >
> > Any recommendations around this? Or is this the way this will work.
> >
> > What should be done around this to make the application start in
> application deployment mode?
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-application
> >
> > Here it has shown to invoke Flink binary to start. Is this the preferred
> way?
> >
> >
> > On Mon, Aug 10, 2020 at 1:46 PM Yun Tang  wrote:
> >>
> >> Hi
> >>
> >> From your description, the task managers are still alive even the job
> is finished and job manager has shut down?
> >> If so, I think this is really weird, could you check what the TM is
> doing via jstack and the logs in job manager and idle task manager?
> >> The task manager should be released when the JM is shutting down.
> >> Moreover, idle task manager would also release after 30 seconds by
> default [1].
> >>
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#resourcemanager-taskmanager-timeout
> >>
> >> Best
> >> Yun Tang
> >>
> >>
> >> 
> >> From: narasimha 
> >> Sent: Monday, August 10, 2020 15:36
> >> To: user@flink.apache.org 
> >> Subject: TaskManagers are still up even after job execution completed
> in PerJob deployment mode
> >>
> >>
> >> I'm trying out Flink Per-Job deployment using docker-compose.
> >>
> >> Configurations:
> >>
> >> version: "2.2"
> >> jobs:
> >>   jobmanager:
> >> build: ./
> >> image: flink_local:1.1
> >> ports:
> >>   - "8081:8081"
> >> command: standalone-job --job-classname com.organization.BatchJob
> >> environment:
> >>   - |
> >> FLINK_PROPERTIES=
> >> jobmanager.rpc.address: jobmanager
> >> parallelism.default: 2
> >>   taskmanager:
> >> image: flink_local:1.1
> >> depends_on:
> >>   - jobmanager
> >> command: taskmanager
> >> scale: 1
> >> environment:
> >>   - |
> >> FLINK_PROPERTIES=
> >> jobmanager.rpc.address: jobmanager
> >> taskmanager.numberOfTaskSlots: 2
> >> parallelism.default: 2
> >>
> >> Flink image is extended with job.jar, Job executed successfully.
> >>
> >> JobManager exited after the job is completed, but is still running,
> which is not expected.
> >>
> >> Any configurations have to be added to exit both JobManager and
> TaskManger?
> >>
> >> Versions:
> >>
> >> Flink - 1.11.0
> >>
> >> Java - 1.8
> >>
> >>
> >> --
> >> A.Narasimha Swamy
> >>
> >
> >
> > --
> > A.Narasimha Swamy
> >
>


Re: getting error after upgrade Flink 1.11.1

2020-08-13 Thread Kostas Kloudas
Hi Dasraj,

Yes, I would recommend to use Public and, if necessary, PublicEvolving
APIs as they provide better guarantees for future maintenance.
Unfortunately there are no Docs about which APIs are public or
publiceEvolving but you can see the annotations of the classes in the
source code.
I guess you have access to the source code given that previously you
were using the cluster client directly.

You can always of course use Flink's REST API [1] to submit a job,
which provides stability guarantees.

If you want to risk a bit more, you can check the PipelineExecutors in
the codebase to see how job submission is currently done within Flink.
This will give you some ideas I hope about how to proceed.

I hope this helps,
Kostas

[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

On Fri, Aug 7, 2020 at 8:07 AM dasraj  wrote:
>
> Hi Kostas,
>
> I am trying to migrate our code base to use new ClusterClient method for job
> submission.
> As you recommending to use new publicEvolving APIs, any doc or link for
> reference will be helpful.
>
> Thanks,
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TaskManagers are still up even after job execution completed in PerJob deployment mode

2020-08-13 Thread Kostas Kloudas
Hi Narasimha,

I am not sure why the TMs are not shutting down, as Yun said, so I am
cc'ing Till here as he may be able to shed some light.
For the application mode, the page in the documentation that you
pointed is the recommended way to deploy an application in application
mode.

Cheers,
Kostas

On Mon, Aug 10, 2020 at 11:16 AM narasimha  wrote:
>
> Thanks, Yun for the prompt reply.
>
> TaskManager was actively looking for ResourceManager, on timeout of 5 mins it 
> got terminated.
>
> Any recommendations around this? Or is this the way this will work.
>
> What should be done around this to make the application start in application 
> deployment mode?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-application
>
> Here it has shown to invoke Flink binary to start. Is this the preferred way?
>
>
> On Mon, Aug 10, 2020 at 1:46 PM Yun Tang  wrote:
>>
>> Hi
>>
>> From your description, the task managers are still alive even the job is 
>> finished and job manager has shut down?
>> If so, I think this is really weird, could you check what the TM is doing 
>> via jstack and the logs in job manager and idle task manager?
>> The task manager should be released when the JM is shutting down.
>> Moreover, idle task manager would also release after 30 seconds by default 
>> [1].
>>
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#resourcemanager-taskmanager-timeout
>>
>> Best
>> Yun Tang
>>
>>
>> 
>> From: narasimha 
>> Sent: Monday, August 10, 2020 15:36
>> To: user@flink.apache.org 
>> Subject: TaskManagers are still up even after job execution completed in 
>> PerJob deployment mode
>>
>>
>> I'm trying out Flink Per-Job deployment using docker-compose.
>>
>> Configurations:
>>
>> version: "2.2"
>> jobs:
>>   jobmanager:
>> build: ./
>> image: flink_local:1.1
>> ports:
>>   - "8081:8081"
>> command: standalone-job --job-classname com.organization.BatchJob
>> environment:
>>   - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> parallelism.default: 2
>>   taskmanager:
>> image: flink_local:1.1
>> depends_on:
>>   - jobmanager
>> command: taskmanager
>> scale: 1
>> environment:
>>   - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>> parallelism.default: 2
>>
>> Flink image is extended with job.jar, Job executed successfully.
>>
>> JobManager exited after the job is completed, but is still running, which is 
>> not expected.
>>
>> Any configurations have to be added to exit both JobManager and TaskManger?
>>
>> Versions:
>>
>> Flink - 1.11.0
>>
>> Java - 1.8
>>
>>
>> --
>> A.Narasimha Swamy
>>
>
>
> --
> A.Narasimha Swamy
>


Re: k8s job cluster using StatefulSet

2020-08-13 Thread Arvid Heise
Hi Alexey,

I don't see any issue in using stateful sets immediately.

I'd recommend using one of the K8s operators or Ververica's community
edition [1] though if you start with a new setup as they may solve even
more issues that you might experience in the future.

[1] https://www.ververica.com/getting-started

On Mon, Aug 10, 2020 at 11:22 PM Alexey Trenikhun  wrote:

> Hello,
> Flink documentation suggests to use Deployments to deploy JM and TM for
> kubernetes job cluster. Is any known potential issues with using
> StatefulSets instead, seems StatefullSet provides uniqueness for JM during
> upgrade/rollback, while with Deployments could be multiple JM pods (e.g.1
> terminating and 1 running)
>
> Thanks,
> Alexey
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Using managed keyed state with AsynIo

2020-08-13 Thread Arvid Heise
Hi KristoffSC,

Afaik asyncIO does not support state operations at all because of your
mentioned issues (RichAsyncFunction fails if you access state).

I'd probably solve it by having a map or process function before and after
the asyncIO for the state operations. If you enable object reuse,
performance should be pretty much the same as if async I/O would support
it, but the threading model becomes much easier.

So, the pipeline is source -> keyby -> map (retrieve state) -> async IO
(use state) -> map (update state). You might need to return Tuple from map and asyncIO to have the full context information on the
subsequent operators.

On Mon, Aug 10, 2020 at 4:24 PM KristoffSC 
wrote:

> Hi guys,
> I'm using Flink 1.9.2
>
> I have a question about uses case where I would like to use FLink's managed
> keyed state with Async IO [1]
>
>
> Lets take as a base line below example taken from [1] and lets assume that
> we are executing this on a keyed stream.
>
> final Future result = client.query(key);
>
> CompletableFuture.supplyAsync(new Supplier() {
>
> @Override
> public String get() {
> try {
> return result.get();
> } catch (InterruptedException | ExecutionException e) {
> // Normally handled explicitly.
> return null;
> }
> }
> }).thenAccept( (String dbResult) -> {
> resultFuture.complete(Collections.singleton(new Tuple2<>(key,
> dbResult)));
> });
>
>
> Imagine that instead passing key to client.query(..) we will pass some
> value
> taken from Flinks Managed, keyed state. Later the supplier's get method
> will
> return a value that should be stored in that state. In other words, we use
> previous results as inputs for next computations.
>
> Is this achievable with Flinks AsyncIo? I can have many pending requests on
> client.query which can finished in a random order. The
> AsyncDataStream.orderedWait will not help he here since this affects only
> the way how Flink "releases" the messages from it's internal queue for
> Async
> operators.
>
>
> What is more, this scenario can result with multiple concurrent
> writes/reads
> to/from Flink's managed state for same key values. Is this thread safe?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Tools for Flink Job performance testing

2020-08-13 Thread Arvid Heise
Hi,

performance testing is quite vague. Usually you start by writing a small
first version of your pipeline and check how the well computation scales on
your data. Flink's web UI [1] already helps quite well for the first time.
Usually you'd also add some metric system and look for advanced metrics in
there [2].

Now, you might be satisfied with your current solution and just start to
extend it. Or you feel that it's not fast enough, scaling well enough. Then
you can tweak your pipeline and perform smaller performance tests on your
user code. Doing performance tests on the whole pipeline [3] would probably
be hard if you just started as you also need to understand internals of
Flink.

I also recommend to use Table API / SQL instead of DataStream if your
application can be expressed well in relational operations. Table API
already applies a wide range of optimizations that are much harder to
implement manually in DataStream API. Table API will also bring your
noticeable performance improvements over time when you update to a newer
Flink version.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/application_profiling.html

On Mon, Aug 10, 2020 at 1:06 PM narasimha  wrote:

> Hi,
>
> I'm new to the streaming world, checking on Performance testing tools.
> Are there any recommended Performance testing tools for Flink?
>
> --
> A.Narasimha Swamy
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Lu Weizheng
Thanks Timo,

So no need to use execute() method in Flink SQL If I do all the thins from 
source to sink in SQL.

Best Regards,
Lu

> 2020年8月13日 下午3:41,Timo Walther  写道:
> 
> Hi Lu,
> 
> `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method 
> that has `execute` in its name will immediately execute a job. Therefore your 
> `env.execute` has an empty pipeline.
> 
> Regards,
> Timo
> 
> [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> 
> On 13.08.20 09:34, Lu Weizheng wrote:
>> Hi,
>> I am using Flink 1.11 SQL using java. All my operations are in SQL. I create 
>> source tables and insert result into sink tables. No other Java operators. I 
>> execute it in Intellij. I can get the final result in the sink tables. 
>> However I get the following error. I am not sure it is a bug or there is 
>> something wrong in my code? Acutally it does not affect the computation.
>> /Exception in thread "main" java.lang.IllegalStateException: No operators 
>> defined in streaming topology. Cannot execute./
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
>> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
>> Here's my code:
>> EnvironmentSettings fsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> fsSettings);
>> // create source and sink tables...
>> tEnv.executeSql("INSERT INTO sensor_1min_avg " +
>> "SELECT " +
>> "  room, " +
>> "  AVG(temp) AS avg_temp," +
>> "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
>> "FROM sensor " +
>> "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
>> env.execute("table api");
> 



Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Timo Walther

Hi Lu,

`env.execute("table api");` is not necessary after FLIP-84 [1]. Every 
method that has `execute` in its name will immediately execute a job. 
Therefore your `env.execute` has an empty pipeline.


Regards,
Timo

[1] 
https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878


On 13.08.20 09:34, Lu Weizheng wrote:

Hi,

I am using Flink 1.11 SQL using java. All my operations are in SQL. I 
create source tables and insert result into sink tables. No other Java 
operators. I execute it in Intellij. I can get the final result in the 
sink tables. However I get the following error. I am not sure it is a 
bug or there is something wrong in my code? Acutally it does not affect 
the computation.


/Exception in thread "main" java.lang.IllegalStateException: No 
operators defined in streaming topology. Cannot execute./
/at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
/at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
/at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
/at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/

/at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/

Here's my code:

         EnvironmentSettings fsSettings =  
  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, fsSettings);

         // create source and sink tables...

         tEnv.executeSql("INSERT INTO sensor_1min_avg " +
                 "SELECT " +
                 "  room, " +
                 "  AVG(temp) AS avg_temp," +
                 "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
                 "FROM sensor " +
                 "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");

         env.execute("table api");






Re: State Processor API to boot strap keyed state for Stream Application.

2020-08-13 Thread Arvid Heise
For future readers: this thread has been resolved in "Please help, I need
to bootstrap keyed state into a stream" on the user mailing list asked by
Marco.

On Fri, Aug 7, 2020 at 11:52 PM Marco Villalobos 
wrote:

> I have read the documentation and various blogs that state that it is
> possible to load data into a data-set and use that data to bootstrap a
> stream application.
>
> The documentation literally says this, "...you can read a batch of data
> from any store, preprocess it, and write the result to a savepoint that you
> use to bootstrap the state of a streaming application." (source:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html
> ).
>
> Another blog states, "You can create both Batch and Stream environment in
> a single job." (source:
> https://www.kharekartik.dev/2019/12/14/bootstrap-your-flink-jobs/
>
> I want to try this approach, but I cannot find any real examples online.
>
> I have failed on numerous attempts.
>
> I have a few questions:
>
> 1) is there an example that demonstrate this feature?
> 2) how can you launch batch and stream environment from a single job?
> 3) does this require two jobs?
>
> Anybody, please help.
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Is there a way to start a timer without ever receiving an event?

2020-08-13 Thread Timo Walther
What you can do is creating an initial control stream e.g. using 
`StreamExecutionEnivronment.fromElements()` and either use 
`union(controlStream, actualStream)` or use 
`actualStream.connect(controlStream)`.


Regards,
Timo


On 12.08.20 18:15, Andrey Zagrebin wrote:
I do not think so. Each timer in KeyedProcessFunction is associated with 
the key. The key is implicitly set into the context from the record 
which is currently being processed.


On Wed, Aug 12, 2020 at 8:00 AM Marco Villalobos 
mailto:mvillalo...@kineteque.com>> wrote:


In the Stream API KeyedProcessFunction,is there a way to start a
timer without ever receiving a stream event?





Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Lu Weizheng
Hi,

I am using Flink 1.11 SQL using java. All my operations are in SQL. I create 
source tables and insert result into sink tables. No other Java operators. I 
execute it in Intellij. I can get the final result in the sink tables. However 
I get the following error. I am not sure it is a bug or there is something 
wrong in my code? Acutally it does not affect the computation.

Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()

Here's my code:

EnvironmentSettings fsSettings =   
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
fsSettings);

// create source and sink tables...

tEnv.executeSql("INSERT INTO sensor_1min_avg " +
"SELECT " +
"  room, " +
"  AVG(temp) AS avg_temp," +
"  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
"FROM sensor " +
"GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");

env.execute("table api");




Re: Flink CPU load metrics in K8s

2020-08-13 Thread Arvid Heise
Hi Abhinav,

according to [1], you need 8u261 for the OperatingSystemMXBean to work as
expected.

[1] https://bugs.openjdk.java.net/browse/JDK-8242287

On Thu, Aug 13, 2020 at 1:10 AM Bajaj, Abhinav 
wrote:

> Thanks Xintong for your input.
>
>
>
> From the information I could find, I understand the JDK version 1.8.0_212
> we use includes the docker/container support.
>
> I also had a quick test inside the docker image using the below –
>
> Runtime.getRuntime().availableProcessors()
>
>
>
> It showed the right number of CPU cores associated to container.
>
>
>
> But I am not familiar with OperatingSystemMXBean used by Flink.
>
> So I don’t know if it will pick up docker CPU limits set by K8s or not. I
> will continue to investigate that.
>
>
>
> In meantime, the K8s metric - container_cpu_usage_seconds_total does seem
> to provide the expected CPU usage for the containers.
>
>
>
>
>
> I was hoping that someone in the community may have already ran into this
> behavior on K8s and can share their specific experience 😊.
>
>
>
> Thanks much.
>
> ~ Abhinav Bajaj
>
>
>
> *From: *Xintong Song 
> *Date: *Wednesday, August 12, 2020 at 3:56 AM
> *To: *"Bajaj, Abhinav" 
> *Cc: *"user@flink.apache.org" , Roman Grebennikov <
> g...@dfdx.me>
> *Subject: *Re: Flink CPU load metrics in K8s
>
>
>
> Hi Abhinav,
>
>
>
> Do you know how many total cpus does the physical machine have where the
> kubernetes container is running?
>
>
>
> I'm asking because I suspect whether JVM is aware that only 1 cpu is
> configured for the container. It does not work like JVM understands how
> many cpu are configured and controls itself to not use more than that. On
> the other hand, JVM tries to use as much cpu time as possible, and the
> limit comes from external (OS, docker, cgroup, ...).
>
>
>
> Please understand that docker containers are not virtual machines. They do
> not "pretend" to only have certain hardwares. I did a simple test on my
> laptop, launching a docker container with cpu limit configured. Inside the
> container, I can still see all my machine's cpus.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Wed, Aug 12, 2020 at 1:19 AM Bajaj, Abhinav 
> wrote:
>
> Hi,
>
>
>
> Reaching out to folks running Flink on K8s.
>
>
>
> ~ Abhinav Bajaj
>
>
>
> *From: *"Bajaj, Abhinav" 
> *Date: *Wednesday, August 5, 2020 at 1:46 PM
> *To: *Roman Grebennikov , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Flink CPU load metrics in K8s
>
>
>
> Thanks Roman for providing the details.
>
>
>
> I also made more observations that has increased my confusion about this
> topic 😝
>
> To ease the calculations, I deployed a test cluster this time providing 1
> CPU in K8s(with docker) for all the taskmanager container.
>
>
>
> When I check the taskmanager CPU load, the value is in the order of
> "0.002158428663932657".
>
> Assuming that the underlying JVM recognizes 1 CPU allocated to the docker
> container, this values means % CPU usage in ball park of 0.21%.
>
>
>
> However, if I look at the K8s metrics(formula below) for this container –
> it turns out in the ball park of 10-16%.
>
> There is no other process running in the container apart from the flink
> taskmanager.
>
>
>
> The order of these two values of CPU % usage is different.
>
>
>
> *Am I comparing the right metrics here?*
>
> *How are folks running Flink on K8s monitoring the CPU load?*
>
>
>
> ~ Abhi
>
>
>
> *% CPU usage from K8s metrics*
>
> sum(rate(container_cpu_usage_seconds_total{pod=~"my-taskmanagers-*",
> container="taskmanager"}[5m])) by (pod)
>
> / sum(container_spec_cpu_quota{pod=~"my-taskmanager-pod-*",
> container="taskmanager"}
>
> / container_spec_cpu_period{pod=~"my-taskmanager-pod-*",
> container="taskmanager"}) by (pod)
>
>
>
> *From: *Roman Grebennikov 
> *Date: *Tuesday, August 4, 2020 at 12:42 AM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: Flink CPU load metrics in K8s
>
>
>
> *LEARN FAST: This email originated outside of HERE.*
> Please do not click on links or open attachments unless you recognize the
> sender and know the content is safe. Thank you.
>
>
>
> Hi,
>
>
>
> JVM.CPU.Load is just a wrapper (MetricUtils.instantiateCPUMetrics) on top
> of OperatingSystemMXBean.getProcessCpuLoad (see
> https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad
> 
> ())
>
>
>
> Usually it looks weird if you have multiple CPU cores. For example, if you
> have a job with a single slot 100% utilizing a single CPU core on a 8 core
> machine, the JVM.CPU.Load will be 1.0/8.0 = 0.125. It's also a
> point-in-time snapshot