Flink application mode/S3A Exception after upgrading from to Flink 1.16.0

2023-01-26 Thread Leon Xu
Hi Flink Users,

We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into the
following issue:
We are running Flink job through application mode. After the upgrade, when
we submit the job and now it gets this exception:

*org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn Application Cluster at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
~[flink-yarn-1.16.0.jar!/:1.16.0]*
*..* Caused by: org.apache.hadoop.fs.PathIOException: `Cannot
get relative path for
URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
Input/output error at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
~[hadoop-common-3.3.3.jar!/:?] at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
~[hadoop-common-3.3.3.jar!/:?] at
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
~[flink-yarn-1.16.0.jar!/:1.16.0] at
org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
~[flink-yarn-1.16.0.jar!/:1.16.0] at
org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
~[flink-yarn-1.16.0.jar!/:1.16.0] at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
~[flink-yarn-1.16.0.jar!/:1.16.0] at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
~[flink-yarn-1.16.0.jar!/:1.16.0] at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
~[flink-yarn-1.16.0.jar!/:1.16.0] ... 35 more

Looks like it failed to upload the temp flink conf file onto S3. In Flink
1.12.7 we don't have this issue. I am wondering if we can get some help
here.

Here is the Flink version that we are using:
Flink 1.16.0
Hadoop 3.3.3

Thanks
Leon


Updating Parallelism based on Traffic and Max Parallelism

2023-01-26 Thread Madan D via user
Hello Team, I am fine tuning my application which can adjust parallelism 
automatically based on traffic and EMR auto-scaling will be adding more cores 
as needed.
I am trying to set max Parallelism but I always see its working with default 
parallelism irrespective of traffic.

Can you please let me know how can flink can adjust parallelism automatically 
based on traffic (HPA)

Regards,Madan 


Re: OOM taskmanager

2023-01-26 Thread Teoh, Hong
Hi Marco,

When you say OOM, I assume you mean TM pod being OOMKilled, is that correct? If 
so, this usually means that the TM is using more than the actual memory 
allocated to the pod. First I would check your memory configuration to figure 
out where this extra memory use is coming from. This is a non trivial task, and 
I’ll list down some common situations I’ve seen tin the past to get you started.


  *   Misconfigured process memory. Flink configuration of 
`taskmanager.memory.process.size` will set the memory of the entire TM, which 
Flink will use and break down into smaller buckets. IF this is higher than 
memory resource of container, this will cause OOMKilled situations
  *   User code has memory leak (e.g. spins up too many threads). Would be 
useful to test the Flink job you have on a local cluster and monitor the memory 
use.
  *   State backend (if you use rocksdb) using too much memory.

You can also look at [1] and [2] for more information.

Regards,
Hong

[1] Talk on Flink memory utilisation https://www.youtube.com/watch?v=F5yKSznkls8
[2] Flink description of TM memory breakdown 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/


From: marco andreas 
Date: Wednesday, 25 January 2023 at 19:57
To: user 
Subject: [EXTERNAL] OOM taskmanager


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.




Hello,

We are deploying a flink application cluster in kubernetes, 2 pods one for the 
JM and the other for the TM.

The problem is when we launch load tests we see that task manager memory usage 
increases,  after the tests  are finished and flink stop processing data the 
memory usage never comes down where it was before, eventually when we launch 
tests again and again the memory of TM continues to grow until it reaches the 
memory resource limit specified in the container templates and it get killed 
because of OOM.


Has anyone faced the same issue and what is the best way to investigate this 
error in order to know the root cause of why the memory usage of the TM never 
comes down when flink finishes processing.

FLink version is 1.16.0.
Thanks,


Re: Docker image Flink 1.15.4

2023-01-26 Thread Chesnay Schepler

1.15.4 is not released yet.

On 26/01/2023 16:06, Peng Zhang wrote:

Hi,

We would like to use Flink 1.15.4 docker image. The latest seems 
1.15.3. Could you make a docker release Flink 1.1.5.4? Thanks!


There is a blocking bug 
https://issues.apache.org/jira/browse/FLINK-28695 
 in 1.5.3 and fixed 
in 1.15.4


BR,
Peng



Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Ah, oops and my original email had a typo:
> Some python dependencies are not included in the flink distribution
tarballs: cloudpickle, py4j and pyflink are in opt/python.

Should read:
> Some python dependencies ARE included in the flink distribution tarballs:
cloudpickle, py4j and pyflink are in opt/python.

On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto  wrote:

> Let me ask a related question:
>
> We are building our own base Flink docker image.  We will be deploying
> both JVM and python apps via flink-kubernetes-operator.
>
> Is there any reason not to install Flink in this image via `pip install
> apache-flink` and use it for JVM apps?
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
> On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:
>
>> Hello,
>>
>> I'm having quite a bit of trouble running pyflink from the default flink
>> distribution tarballs.  I'd expect the python examples to work as long as
>> python is installed, and we've got the distribution.  Some python
>> dependencies are not included in the flink distribution tarballs:
>> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
>> protobuf.
>>
>> Now that I'm looking, I see that the pyflink installation instructions
>> 
>>  are
>> to install via pip.
>>
>> I'm doing this in Docker for use with the flink-kubernetes-operator.  In
>> the Using Flink Python on Docker
>> 
>>  instructions,
>> there is a pip3 install apache-flink step.  I find this strange, since I'd
>> expect the 'FROM flink:1.15.2'  part to be sufficient.
>>
>> By pip installing apache-flink, this docker image will have the flink
>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>> .
>> BUT ALSO flink lib jars will be installed at e.g.
>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>> So, by following those instructions, flink is effectively installed twice
>> into the docker image.
>>
>> Am I correct or am I missing something?
>>
>> Is using pyflink from the flink distribution tarball (without pip) not a
>> supported way to use pyflink?
>>
>> Thanks!
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>


Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Let me ask a related question:

We are building our own base Flink docker image.  We will be deploying both
JVM and python apps via flink-kubernetes-operator.

Is there any reason not to install Flink in this image via `pip install
apache-flink` and use it for JVM apps?

-Andrew Otto
 Wikimedia Foundation



On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:

> Hello,
>
> I'm having quite a bit of trouble running pyflink from the default flink
> distribution tarballs.  I'd expect the python examples to work as long as
> python is installed, and we've got the distribution.  Some python
> dependencies are not included in the flink distribution tarballs:
> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
> protobuf.
>
> Now that I'm looking, I see that the pyflink installation instructions
> 
>  are
> to install via pip.
>
> I'm doing this in Docker for use with the flink-kubernetes-operator.  In
> the Using Flink Python on Docker
> 
>  instructions,
> there is a pip3 install apache-flink step.  I find this strange, since I'd
> expect the 'FROM flink:1.15.2'  part to be sufficient.
>
> By pip installing apache-flink, this docker image will have the flink
> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
> .
> BUT ALSO flink lib jars will be installed at e.g.
> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
> So, by following those instructions, flink is effectively installed twice
> into the docker image.
>
> Am I correct or am I missing something?
>
> Is using pyflink from the flink distribution tarball (without pip) not a
> supported way to use pyflink?
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Docker image Flink 1.15.4

2023-01-26 Thread Peng Zhang
Hi,

We would like to use Flink 1.15.4 docker image. The latest seems 1.15.3.
Could you make a docker release Flink 1.1.5.4? Thanks!

There is a blocking bug https://issues.apache.org/jira/browse/FLINK-28695 in
1.5.3 and fixed in 1.15.4

BR,
Peng


Re: Using S3 as stream source in Flink

2023-01-26 Thread Sriram Ganesh
I saw in aws-samples
https://github.com/aws-samples/flink-stream-processing-refarch/blob/master/kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/utils/TaxiEventReader.java
they are not using FileSource.

Now I got it. Thanks, Martijn.

On Wed, Jan 25, 2023 at 9:07 PM Martijn Visser 
wrote:

> Hi Sriram G,
>
> Both the DataStream and Table API support filesystem as a source in
> unbounded (streaming mode) with exactly once guarantees. This is documented
> at
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> and
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/
>
> Best regards,
>
> Martijn
>
> Op di 24 jan. 2023 om 10:26 schreef Sriram Ganesh :
>
>> Hi Everyone,
>>
>> I am thinking of switching my input source from Kafka to S3. First, I
>> couldn't find any streaming source connector for S3. I have some basic
>> questions about
>>
>> 1. How S3 will work as a streaming source with proper checkpointing.?
>> 2. How Flink will manage the last offset processed from a file?
>> 3. Is exactly_once possible while using S3 as a streaming source?
>> 4. What could be the pros and cons of using an S3 kind of storage as a
>> streaming source?
>>
>> Any help would be appreciated. Thanks in advance.
>>
>> Thanks,
>> --
>> *Sriram G*
>> *Tech*
>>
>>

-- 
*Sriram G*
*Tech*