Re: asyhcnrouonous io question

2021-10-05 Thread Nicolaus Weidner
Hi Tom,

On Mon, Oct 4, 2021 at 10:42 PM tom yang  wrote:

> Hello,
>
>
>
> I have a recently ran into an issue with RichAsyncFunction and wanted to
> get some guidance from the community
>
>
>
> Please see snippet
>
>
>
> *class* AsyncFetchFromHttp *extends* RichAsyncFunction String, String>> {
>
> 2
>
> 3*private* *transient* AysncHttpClient client;
>
> 4
>
> 5@Override
>
> 6*public* *void* *open*(Configuration parameters) *throws* Exception {
>
> 7client = *new* AysncHttpClient();
>
> 8}
>
> 9
>
> 10@Override
>
> 11*public* *void* close() *throws* Exception {
>
> 12client.close();
>
> 13}
>
> 14
>
> 15@Override
>
> 16*public* *void* asyncInvoke(String key, *final* ResultFuture String, String>> resultFuture) *throws* Exception {
>
> 17
>
> 18*// issue the asynchronous request, receive a future for result*
>
> 19CompleteableFuture> future = httpClient
> .sendAsync(request, HttpResponse.BodyHandlers.ofString())
>
> 20
>
> 21future.whenCompleteAsync((response, throwable) -> {
>
> 22  *if* (throwable != *null* ) {
>
> 23
>
> 24  resultFuture.completeExceptionally(throwable);
>
> 25  }
>
> 26  *else* {
>
> 27*if* (resp.statusCode() == HttpStatus.SC_OK) {
>
> 28  resultFuture.complete(Collections.singleton(*new* Tuple2
> <>(key, response.body())
>
> 29}
>
> 30*else* *if* (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {
>
> 32  resultFuture.complete(Collections.emptyList())
>
> 33}
>
> 34*else* {
>
> 35   resultFuture.completeExceptionally(*new* RuntimeException
> ("Server processing error"));
>
> 36}
>
> 37  }
>
> 38
>
> 39})
>
> 40
>
> 41
>
> 42}
>
> 43}
>
>
>
> 1 . If the future completes exceptionally, ie resultFuture
> .completeExceptionally(throwable);
>
> does the input message get discarded?
>

Which input do you mean here, "request"? It is not defined in your snippet,
did it get lost when trimming unimportant parts?
By default, you will get only the contained throwable, you would have to
enrich it with the input if you want to retain it.

2. Should the request be made on a dedicated ExecutorService or is the
> forkpoolcommon sufficient?
>

I don't see a good reason in general here to use a separate thread pool for
the requests. They are async (not blocking), are part of your Flink job and
run on your Taskmanagers. Unless there is something special in your setup
that makes you suspect they block other tasks...


> 3. If the rest api service for example returns 404, should you complete
> with an empty collection or can you omit line 32 entirely?
>

This depends on your desired behavior: Do you want it to complete normally,
but without any results (this is your current state), or do you want it to
complete exceptionally?

Best regards,
Nico


>
>
> Thanks!
>
>
>
>
>


Re: New session mode job manager deployment rejected existing task managers

2021-10-05 Thread Sharon Xie
Actually we figured it out. We need to configure High Availability mode to
recover jobs during new kubernetes deployment.

On Tue, Oct 5, 2021 at 11:39 AM Sharon Xie  wrote:

> Hi,
>
> I'm currently running Flink 1.13.2 using kubernetes session mode - native
> kubernetes. When I update the job manager deployment through `kubectl apply
> flink-jobmanager-deployment.yaml`, a new job manager pod is created. I'd
> expect all the task manager pods will re-register with the new JM pod.
> However the new JM pod rejected all the existing task managers that were
> running before the update. It looks like the new JM deployment does not
> recognize the existing TM pods. Is this expected? If so, how can I
> configure the deployment to recover the existing TMs?
>
>
> Thanks,
> Sharon
>
> JM logs:
>
> 2021-10-05 18:00:53,011 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Registering TaskManager with ResourceID
> X-flink-cluster-local-taskmanager-1-1 (akka.tcp://
> flink@10.244.0.191:6122/user/rpc/taskmanager_0) at ResourceManager
>
> 2021-10-05 18:00:53,033 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Registering TaskManager with ResourceID
> X-flink-cluster-local-taskmanager-1-1 (akka.tcp://
> flink@10.244.0.191:6122/user/rpc/taskmanager_0) at ResourceManager
>
> 2021-10-05 18:00:53,046 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Worker XX-flink-cluster-local-taskmanager-1-1 is registered.
>
> 2021-10-05 18:01:45,835 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Stopping worker X-flink-cluster-local-taskmanager-1-1.
>
> 2021-10-05 18:01:45,835 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] -
> Stopping TaskManager pod XX-flink-cluster-local-taskmanager-1-1.
>
> 2021-10-05 18:01:45,837 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Closing TaskExecutor
> connection XX-flink-cluster-local-taskmanager-1-1 because: TaskExecutor
> exceeded the idle timeout.
>
> 2021-10-05 18:01:45,877 WARN  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
> [] - Discard registration from TaskExecutor
> X-flink-cluster-local-taskmanager-1-1 at (akka.tcp://
> flink@10.244.0.191:6122/user/rpc/taskmanager_0) because the framework did
> not recognize it
>
>
>
> TM logs:
>
> 2021-10-05 18:01:45,843 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor
>   [] - Close ResourceManager connection
> 9f664a154b1924918b46d41016324a74.
>
> 2021-10-05 18:01:45,844 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor
>   [] - Connecting to ResourceManager
> akka.tcp://flink@X-flink-cluster-service
> :6123/user/rpc/resourcemanager_*().
>
> 2021-10-05 18:01:45,856 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor
>   [] - Resolved ResourceManager address, beginning registration
>
> 2021-10-05 18:01:45,883 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Fatal
> error occurred in TaskExecutor akka.tcp://
> flink@10.244.0.191:6122/user/rpc/taskmanager_0.
>
> org.apache.flink.util.FlinkException: The TaskExecutor's registration at
> the ResourceManager 
> akka.tcp://flink@X-flink-cluster-service:6123/user/rpc/resourcemanager_*
> has been rejected: Rejected TaskExecutor registration at the ResourceManger
> because: The ResourceManager does not recognize this TaskExecutor.
>
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-05 Thread Kevin Lam
i was reading a bit about RocksDb and it seems the Java version is somewhat
particular about how it should be cleaned up to ensure all resources are
cleaned up:


ttps://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management


   - "Many of the Java Objects used in the RocksJava API will be backed by
   C++ objects for which the Java Objects have ownership. As C++ has no notion
   of automatic garbage collection for its heap in the way that Java does, we
   must explicitly free the memory used by the C++ objects when we are
   finished with them."

Column families also have a specific close procedure


https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families

   - "It is important to note that when working with Column Families in
   RocksJava, there is a very specific order of destruction that must be
   obeyed for the database to correctly free all resources and shutdown."

When a running job fails and a running TaskManager restores from
checkpoint, is the old Embedded RocksDb being cleaned up properly? I wasn't
really sure where to look in the Flink source code to verify this.

On Mon, Oct 4, 2021 at 4:56 PM Kevin Lam  wrote:

> We tried with 1.14.0, unfortunately we still run into the issue. Any
> thoughts or suggestions?
>
> On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam  wrote:
>
>> Hi Fabian,
>>
>> We're using our own image built from the official Flink docker image, so
>> we should have the code to use jemalloc in the docker entrypoint.
>>
>> I'm going to give 1.14 a try and will let you know how it goes.
>>
>> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul 
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>>> the memory control [1]. In the past we also saw problems with the allocator
>>> used of the OS. We switched to use jemalloc within our docker images which
>>> has a better memory fragmentation [2]. Are you using the official Flink
>>> docker image or did you build your own?
>>>
>>> I am also pulling in yun tang who is more familiar with Flink’s state
>>> backend. Maybe he has an immediate idea about your problem.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>>> [2]
>>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>>> 
>>>
>>>
>>>


New session mode job manager deployment rejected existing task managers

2021-10-05 Thread Sharon Xie
Hi,

I'm currently running Flink 1.13.2 using kubernetes session mode - native
kubernetes. When I update the job manager deployment through `kubectl apply
flink-jobmanager-deployment.yaml`, a new job manager pod is created. I'd
expect all the task manager pods will re-register with the new JM pod.
However the new JM pod rejected all the existing task managers that were
running before the update. It looks like the new JM deployment does not
recognize the existing TM pods. Is this expected? If so, how can I
configure the deployment to recover the existing TMs?


Thanks,
Sharon

JM logs:

2021-10-05 18:00:53,011 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Registering TaskManager with ResourceID
X-flink-cluster-local-taskmanager-1-1 (akka.tcp://
flink@10.244.0.191:6122/user/rpc/taskmanager_0) at ResourceManager

2021-10-05 18:00:53,033 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Registering TaskManager with ResourceID
X-flink-cluster-local-taskmanager-1-1 (akka.tcp://
flink@10.244.0.191:6122/user/rpc/taskmanager_0) at ResourceManager

2021-10-05 18:00:53,046 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Worker XX-flink-cluster-local-taskmanager-1-1 is registered.

2021-10-05 18:01:45,835 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Stopping worker X-flink-cluster-local-taskmanager-1-1.

2021-10-05 18:01:45,835 INFO
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Stopping
TaskManager pod XX-flink-cluster-local-taskmanager-1-1.

2021-10-05 18:01:45,837 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Closing TaskExecutor
connection XX-flink-cluster-local-taskmanager-1-1 because: TaskExecutor
exceeded the idle timeout.

2021-10-05 18:01:45,877 WARN
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
[] - Discard registration from TaskExecutor
X-flink-cluster-local-taskmanager-1-1 at (akka.tcp://
flink@10.244.0.191:6122/user/rpc/taskmanager_0) because the framework did
not recognize it



TM logs:

2021-10-05 18:01:45,843 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor
  [] - Close ResourceManager connection
9f664a154b1924918b46d41016324a74.

2021-10-05 18:01:45,844 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor
  [] - Connecting to ResourceManager
akka.tcp://flink@X-flink-cluster-service
:6123/user/rpc/resourcemanager_*().

2021-10-05 18:01:45,856 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor
  [] - Resolved ResourceManager address, beginning registration

2021-10-05 18:01:45,883 ERROR
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Fatal
error occurred in TaskExecutor akka.tcp://
flink@10.244.0.191:6122/user/rpc/taskmanager_0.

org.apache.flink.util.FlinkException: The TaskExecutor's registration at
the ResourceManager
akka.tcp://flink@X-flink-cluster-service:6123/user/rpc/resourcemanager_*
has been rejected: Rejected TaskExecutor registration at the ResourceManger
because: The ResourceManager does not recognize this TaskExecutor.


Re: Issues while upgrading from 1.12.1 to 1.14.0

2021-10-05 Thread Nicolaus Weidner
Hi Parag,

I am not so familiar with the setup you are using, but did you check out
[1]? Maybe the parameter
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]]
is what you are looking for?

Best regards,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#application-mode-on-docker

On Tue, Oct 5, 2021 at 12:37 PM Parag Somani  wrote:

> Hello,
>
> We are currently using Apache flink 1.12.0 deployed on k8s cluster of 1.18
> with zk for HA. Due to certain vulnerabilities in container related with
> few jar(like netty-*, meso), we are forced to upgrade.
>
> While upgrading flink to 1.14.0, faced NPE,
> https://issues.apache.org/jira/browse/FLINK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17402570#comment-17402570
>
> To address it, I have followed steps
>
>1. savepoint creation
>2. Stop the job
>3. Restore from save point where i am facing challenge.
>
> For step #3 from above, i was able to restore from savepoint mainly
> because:
> "bin/flink run -s :savepointPath [:runArgs] "
> It majorly about restarting a jar file uploaded. As our application is
> based on k8s and running using docker, i was not able to restore it. And
> because of it, state of variables in accumulator got corrupted and i lost
> the data in one of env.
>
> My query is, what is preffered way to restore from savepoint, if
> application is running on k8s using docker.
>
> We are using following command to run job manager:
>  /docker-entrypoint.sh "standalone-job" "-Ds3.access-key=
> ${AWS_ACCESS_KEY_ID}" "-Ds3.secret-key=${AWS_SECRET_ACCESS_KEY}"
> "-Ds3.endpoint=${AWS_S3_ENDPOINT}" "-Dhigh-availability.zookeeper.quorum=
> ${ZOOKEEPER_CLUSTER}" "--job-classname" ""  ${args}
>
> Thank you in advance...!
>
> --
> Regards,
> Parag Surajmal Somani.
>


Re: k8s not able to submit job from jobmanager

2021-10-05 Thread Dhiru
 I think , I got the answer ( application mode cluster doesn't support running 
job using cli )
On Tuesday, October 5, 2021, 08:38:24 AM EDT, Israel Ekpo 
 wrote:  
 
 Your Flink versions are different 
Your Docker container has version 1.13.2 but it seems your job is attempting to 
submit with 1.14 in application mode 
That is the first obvious observation 
On Tue, Oct 5, 2021 at 5:35 AM Dhiru  wrote:

My DockerFile 
FROM flink:1.13.2-scala_2.12-java11
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoopRUN ln -fs 
/opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-prestoRUN ln -fs 
/opt/flink/opt/flink-s3-fs-presto-*.jar /opt/flink/plugins/flink-s3-fs-presto/.
COPY WordCount.jar  /opt/flink/bin/

I am trying to run the job manually from my local laptop, able to run job 
successfullydocker run -it images sh 
./start-cluster.sh flink run WordCount.sh 
for Kubernetes Now using the same image am trying to create a cluster in 
Application mode 
(https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/)

 Trying to use the same image and run from Kubernetes jobmanager, getting this 
error 
k exec -it flink-jobmanager-kzgwk  sh -n ha 

$ flink run WordCount.jar Executing WordCount example with default input data 
set.Use --input to specify file input.Printing result to stdout. Use --output 
to specify output path.WARNING: An illegal reflective access operation has 
occurredWARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/opt/flink/lib/flink-dist_2.12-1.13.2.jar) to field 
java.lang.String.valueWARNING: Please consider reporting this to the 
maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use 
--illegal-access=warn to enable warnings of further illegal reflective access 
operationsWARNING: All illegal access operations will be denied in a future 
release09:30:38.530 [main] ERROR org.apache.flink.client.cli.CliFrontend - 
Error while running the 
command.org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Failed to execute job 'Streaming WordCount'.        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
 [flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
[flink-dist_2.12-1.13.2.jar:1.13.2]Caused by: 
org.apache.flink.util.FlinkException: Failed to execute job 'Streaming 
WordCount'.
anything which I am missing ?

  

Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-05 Thread Nicolaus Weidner
Hi Kamil,

On Tue, Oct 5, 2021 at 9:03 AM Kamil ty  wrote:

> Hello,
>
> I'm trying to run a pyflink job in cluster mode (with yarn). My job
> contains source and sink definitions using Table API which are converted to
> a datastream and back. Unfortunately I'm getting an unusual exception at:
> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*
>

Just to make sure: Is the missing quotation mark just a typo in your mail,
or your code (right before the closing bracket)?
*table = t_env.from_data_stream(ds, 'user_id, first_name, last_name['])*

Best regards,
Nico


Snapshot method for custom keyed state checkpointing ?

2021-10-05 Thread Marc LEGER
Hello,

Is there any method available in a RichFunction to be called by Flink with
a keyed context each time a checkpoint is triggered please ?

It seems that the CheckpointedFunction interface provides such a feature
(snapshotState method) but only in case of operator state and it is called
in a non-keyed context.

Indeed, I am implementing a CoFlatMapFunction with:
- a keyed state (state1) for a "control" stream (stream1) which is not
often updated,
- a keyed state (state2) for a "data" stream (stream2) with a high
throughput and relying on a custom solution for internal state snapshot
with some potential performance impact.

Consequently, I don't want to trigger a state2 update for every event
received in stream2 for efficiency reasons but rather update state2 based
on checkpoints triggered by Flink.

Best Regards,
Marc


Re: k8s not able to submit job from jobmanager

2021-10-05 Thread Israel Ekpo
Your Flink versions are different

Your Docker container has version 1.13.2 but it seems your job is
attempting to submit with 1.14 in application mode

That is the first obvious observation

On Tue, Oct 5, 2021 at 5:35 AM Dhiru  wrote:

> *My DockerFile *
>
> FROM flink:1.13.2-scala_2.12-java11
>
> RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
> RUN ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar
> /opt/flink/plugins/flink-s3-fs-hadoop/.
>
> RUN mkdir -p /opt/flink/plugins/flink-s3-fs-presto
> RUN ln -fs /opt/flink/opt/flink-s3-fs-presto-*.jar
> /opt/flink/plugins/flink-s3-fs-presto/.
>
> COPY WordCount.jar  /opt/flink/bin/
>
>
> I am trying to run the job manually from my local laptop, able to run job
> successfully
> docker run -it images sh
>
> ./start-cluster.sh
> flink run WordCount.sh
>
> *for Kubernetes *
> Now using the same image am trying to create a cluster in Application mode
> (
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/
> )
>
>
>  Trying to use the same image and run from Kubernetes jobmanager, getting
> this error
>
> k exec -it flink-jobmanager-kzgwk  sh -n ha
>
> $ flink run WordCount.jar
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/opt/flink/lib/flink-dist_2.12-1.13.2.jar) to field
> java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> *09:30:38.530 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error
> while running the command.*
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'Streaming WordCount'.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'Streaming WordCount'.
>
> anything which I am missing ?
>
>


Issues while upgrading from 1.12.1 to 1.14.0

2021-10-05 Thread Parag Somani
Hello,

We are currently using Apache flink 1.12.0 deployed on k8s cluster of 1.18
with zk for HA. Due to certain vulnerabilities in container related with
few jar(like netty-*, meso), we are forced to upgrade.

While upgrading flink to 1.14.0, faced NPE,
https://issues.apache.org/jira/browse/FLINK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17402570#comment-17402570

To address it, I have followed steps

   1. savepoint creation
   2. Stop the job
   3. Restore from save point where i am facing challenge.

For step #3 from above, i was able to restore from savepoint mainly because:
"bin/flink run -s :savepointPath [:runArgs] "
It majorly about restarting a jar file uploaded. As our application is
based on k8s and running using docker, i was not able to restore it. And
because of it, state of variables in accumulator got corrupted and i lost
the data in one of env.

My query is, what is preffered way to restore from savepoint, if
application is running on k8s using docker.

We are using following command to run job manager:
 /docker-entrypoint.sh "standalone-job" "-Ds3.access-key=
${AWS_ACCESS_KEY_ID}" "-Ds3.secret-key=${AWS_SECRET_ACCESS_KEY}"
"-Ds3.endpoint=${AWS_S3_ENDPOINT}" "-Dhigh-availability.zookeeper.quorum=
${ZOOKEEPER_CLUSTER}" "--job-classname" ""  ${args}

Thank you in advance...!

-- 
Regards,
Parag Surajmal Somani.


k8s not able to submit job from jobmanager

2021-10-05 Thread Dhiru
My DockerFile 
FROM flink:1.13.2-scala_2.12-java11
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoopRUN ln -fs 
/opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-prestoRUN ln -fs 
/opt/flink/opt/flink-s3-fs-presto-*.jar /opt/flink/plugins/flink-s3-fs-presto/.
COPY WordCount.jar  /opt/flink/bin/

I am trying to run the job manually from my local laptop, able to run job 
successfullydocker run -it images sh 
./start-cluster.sh flink run WordCount.sh 
for Kubernetes Now using the same image am trying to create a cluster in 
Application mode 
(https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/)

 Trying to use the same image and run from Kubernetes jobmanager, getting this 
error 
k exec -it flink-jobmanager-kzgwk  sh -n ha 

$ flink run WordCount.jar Executing WordCount example with default input data 
set.Use --input to specify file input.Printing result to stdout. Use --output 
to specify output path.WARNING: An illegal reflective access operation has 
occurredWARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/opt/flink/lib/flink-dist_2.12-1.13.2.jar) to field 
java.lang.String.valueWARNING: Please consider reporting this to the 
maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use 
--illegal-access=warn to enable warnings of further illegal reflective access 
operationsWARNING: All illegal access operations will be denied in a future 
release09:30:38.530 [main] ERROR org.apache.flink.client.cli.CliFrontend - 
Error while running the 
command.org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Failed to execute job 'Streaming WordCount'.        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
 [flink-dist_2.12-1.13.2.jar:1.13.2]        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
[flink-dist_2.12-1.13.2.jar:1.13.2]Caused by: 
org.apache.flink.util.FlinkException: Failed to execute job 'Streaming 
WordCount'.
anything which I am missing ?


Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-05 Thread Kamil ty
Hello,

I'm trying to run a pyflink job in cluster mode (with yarn). My job
contains source and sink definitions using Table API which are converted to
a datastream and back. Unfortunately I'm getting an unusual exception at:
*table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*

The exception is:
*Traceback (most recent call last):*
*  File "users_job.py", line 40, in *
*table = t_env.from_data_stream(ds, 'user_id, first_name, last_name)*
*  File
"/jobs/venv/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 1734, in from_data_stream*
*JPythonConfigUtil.declareManagedMemory(*
*  File "/jobs/venv/lib/python3.7/site-packages/py4j/java_gateway.py", line
1516, in __getattr__*
*"{0}.{1} does not exist in the JVM".format(self._fqn, name))*
*py4j.protocol.Py4JError:
org.apache.flink.python.util.PythonConfigUtil.declareManagedMemory does not
exist in the JVM*

Python version: 3.7 (venv built by the setup-python-environment.sh script
from documentation)
Flink version: 1.12.3

Any help would be appreciated.

Kind Regards
Kamil