[jira] [Created] (FLINK-35640) Drop Flink 1.15 support for the operator

2024-06-18 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35640:


 Summary: Drop Flink 1.15 support for the operator
 Key: FLINK-35640
 URL: https://issues.apache.org/jira/browse/FLINK-35640
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Mate Czagany
 Fix For: kubernetes-operator-1.10.0


As the operator only supports the latest 4 stable minor Flink versions, 1.15 
support should be dropped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35493) Make max history age and count configurable for FlinkStateSnapshot resources

2024-05-30 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35493:


 Summary: Make max history age and count configurable for 
FlinkStateSnapshot resources
 Key: FLINK-35493
 URL: https://issues.apache.org/jira/browse/FLINK-35493
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35492) Add metrics for FlinkStateSnapshot resources

2024-05-30 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35492:


 Summary: Add metrics for FlinkStateSnapshot resources
 Key: FLINK-35492
 URL: https://issues.apache.org/jira/browse/FLINK-35492
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35267) Create documentation for FlinkStateSnapshot CR

2024-04-29 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35267:


 Summary: Create documentation for FlinkStateSnapshot CR
 Key: FLINK-35267
 URL: https://issues.apache.org/jira/browse/FLINK-35267
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany
 Fix For: kubernetes-operator-1.9.0


This should cover the new features and migration from the now deprecated 
methods of taking snapshots.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35266) Add e2e tests for FlinkStateSnapshot CRs

2024-04-29 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35266:


 Summary: Add e2e tests for FlinkStateSnapshot CRs
 Key: FLINK-35266
 URL: https://issues.apache.org/jira/browse/FLINK-35266
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany
 Fix For: kubernetes-operator-1.9.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35265) Implement FlinkStateSnapshot custom resource

2024-04-29 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35265:


 Summary: Implement FlinkStateSnapshot custom resource
 Key: FLINK-35265
 URL: https://issues.apache.org/jira/browse/FLINK-35265
 Project: Flink
  Issue Type: Sub-task
  Components: Kubernetes Operator
Reporter: Mate Czagany
 Fix For: kubernetes-operator-1.9.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35263) FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-29 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35263:


 Summary: FLIP-446: Kubernetes Operator State Snapshot CRD
 Key: FLINK-35263
 URL: https://issues.apache.org/jira/browse/FLINK-35263
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Mate Czagany
 Fix For: kubernetes-operator-1.9.0


Described in 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35106) Kubernetes Operator ignores checkpoint type configuration

2024-04-15 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35106:


 Summary: Kubernetes Operator ignores checkpoint type configuration
 Key: FLINK-35106
 URL: https://issues.apache.org/jira/browse/FLINK-35106
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0
Reporter: Mate Czagany


There is a configuration for checkpoint type that will be taken if perioid 
checkpointing is enabled or a manual checkpoint is triggered.

However, the configuration value `kubernetes.operator.checkpoint.type` is 
completely ignored when any checkpoint is triggered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34439) Move chown operations to COPY commands in Dockerfile

2024-02-13 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-34439:


 Summary: Move chown operations to COPY commands in Dockerfile
 Key: FLINK-34439
 URL: https://issues.apache.org/jira/browse/FLINK-34439
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Mate Czagany


We can lower the size of the output operator container image if we don't run 
'chown' commands in seperate RUN commands inside the Dockerfile, but instead 
use the '--chown' argument of the COPY command.

Using 'RUN chown...' will copy all the files affected with their whole size to 
a new layer, duplicating the previous files from the COPY command.

Example:
{code:java}
$ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8
...
     3 months ago  RUN /bin/sh -c chown -R flink:flink $FLINK...  
116MB       buildkit.dockerfile.v0
... {code}
This would mean a 20% reduction in image size.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34438) Kubernetes Operator doesn't wait for TaskManager deletion in native mode

2024-02-13 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-34438:


 Summary: Kubernetes Operator doesn't wait for TaskManager deletion 
in native mode
 Key: FLINK-34438
 URL: https://issues.apache.org/jira/browse/FLINK-34438
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.1, kubernetes-operator-1.7.0, 
kubernetes-operator-1.8.0
Reporter: Mate Czagany


This issue was partly fixed in FLINK-32334 but native mode was not included in 
the fix.

I don't see any downsides with adding the same check to native deployment mode, 
which would make sure that all TaskManagers were deleted when we shut down a 
Flink cluster.

There should also be some logs suggesting that the timeout was exceeded instead 
of silently returning when waiting for the cluster to shut down.

An issue was also mentioned on the mailing list which seems to be related to 
this: [https://lists.apache.org/thread/4gwj4ob4n9zg7b90vnqohj8x1p0bb5cb]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32652) Operator cannot scale standalone deployments in reactive mode

2023-07-23 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-32652:


 Summary: Operator cannot scale standalone deployments in reactive 
mode
 Key: FLINK-32652
 URL: https://issues.apache.org/jira/browse/FLINK-32652
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0
Reporter: Mate Czagany


After we upgraded the Fabric8 Kubernetes Client to 6.7.0 the operator can no 
longer scale standalone deployments in reactive mode because it uses the 
"deployments/scale" API instead of patching the deployment since this commit: 
[https://github.com/fabric8io/kubernetes-client/commit/c4d3dd14c6ba7261fe4646636d277cba1c2122a2]

We will get the following error: 
{code:java}
org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET 
at: 
https://10.96.0.1:443/apis/apps/v1/namespaces/flink/deployments/basic-reactive-example-taskmanager/scale.
 Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. deployments.apps 
"basic-reactive-example-taskmanager" is forbidden: User 
"system:serviceaccount:flink:flink-operator" cannot get resource 
"deployments/scale" in API group "apps" in the namespace "flink". {code}
The fix is easy, we just need to add "deployments/scale" to the ClusterRole we 
create, I'll create a PR soon



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31613) Some default operator config values are overwritten by values.yaml

2023-03-25 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-31613:


 Summary: Some default operator config values are overwritten by 
values.yaml
 Key: FLINK-31613
 URL: https://issues.apache.org/jira/browse/FLINK-31613
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Mate Czagany


It's a bit confusing that in the documentation it's stated that 
'kubernetes.operator.reconcile.interval' is 1 min by default and 
'kubernetes.operator.observer.progress-check.interval' is 10 sec when they are 
being overwritten to 15 sec and 5 sec respectively in the default values.yaml.

 

A possible solution might be to change the default values to 15 and 5 sec in 
the configuration values and remove/comment them in values.yaml, however this 
will introduce a change in configuration for users that have set a custom 
'defaultConfiguration.flink-conf.yaml' value.

 

Please let me know what you think and if the solution sounds good feel free to 
assign me this ticket and I'll create a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31326) Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0

2023-03-05 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-31326:


 Summary: Disabled source scaling breaks downstream scaling if 
source busyTimeMsPerSecond is 0
 Key: FLINK-31326
 URL: https://issues.apache.org/jira/browse/FLINK-31326
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Mate Czagany


In case of 'scaling.sources.enabled'='false' the 'TARGET_DATA_RATE' of the 
source vertex will be calculated as '(1000 / busyTimeMsPerSecond) * 
numRecordsOutPerSecond' which currently on the main branch results in an 
infinite value if 'busyTimeMsPerSecond' is 0. This will also affect downstream 
operators.

I'm not that familiar with the autoscaler code, but it's in my opinion it's 
quite unexpected to have such behavioral changes by setting 
'scaling.sources.enabled' to false.

 

With PR #543 for FLINK-30575 
(https://github.com/apache/flink-kubernetes-operator/pull/543) scaling will 
happen even with 'busyTimeMsPerSecond' being 0, but it will result in 
unreasonably high parallelism numbers for downstream operators because 
'TARGET_DATA_RATE' will be very high where 0 'busyTimeMsPerSecond' will be 
replaced with 1e-10.


Metrics from the operator logs (source=e5a72f353fc1e6bbf3bd96a41384998c, 
sink=51312116a3e504bccb3874fc80d5055e)

'scaling.sources.enabled'='true':
{code:java}
 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: 
5.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: 
10.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: 
3.8667
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: 
3.8833

jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 
4.827299209321681
jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 
4.848351269098938
jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: 
10.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: 
21.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: 
7.733
jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Average: 
7.767{code}

'scaling.sources.enabled'='false':
{code:java}
 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: 
NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: 
NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: Infinity
jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: NaN

jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 5.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 
4.9805556
jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0
jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: 
NaN
jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: 
NaN
jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: Infinity

[jira] [Created] (FLINK-31187) Standalone HA mode does not work if dynamic properties are supplied

2023-02-22 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-31187:


 Summary: Standalone HA mode does not work if dynamic properties 
are supplied
 Key: FLINK-31187
 URL: https://issues.apache.org/jira/browse/FLINK-31187
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.4.0
Reporter: Mate Czagany
 Attachments: standalone-ha.yaml

With FLINK-30518 '--host $(POD_IP)' has been added to the arguments of the JMs 
which fixes the issue with HA on standalone mode, but it always gets appended 
to the end of the final JM arguments: 
https://github.com/usamj/flink-kubernetes-operator/blob/72ec9d384def3091ce50c2a3e2a06cded3b572e6/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java#L107

But this will not be parsed properly in case any dynamic properties were set in 
the arguments, e.g.:
{code:java}
 Program Arguments:
   --configDir
   /opt/flink/conf
   -D
   jobmanager.memory.off-heap.size=134217728b
   -D
   jobmanager.memory.jvm-overhead.min=201326592b
   -D
   jobmanager.memory.jvm-metaspace.size=268435456b
   -D
   jobmanager.memory.heap.size=469762048b
   -D
   jobmanager.memory.jvm-overhead.max=201326592b
   --job-classname
   org.apache.flink.streaming.examples.statemachine.StateMachineExample
   --test
   test
   --host
   172.17.0.11{code}
You can verify this bug by using the YAML I've attached and in the JM logs you 
can see this line: 
{code:java}
Remoting started; listening on addresses 
:[akka.tcp://flink@flink-example-statemachine.flink:6123]{code}
Without any program arguments supplied this would correctly be:
{code:java}
Remoting started; listening on addresses 
:[akka.tcp://flink@172.17.0.8:6123]{code}

I believe this could be easily fixed by appending the --host parameter before 
JobSpec.args and if a committer can assign this ticket to me I can create a PR 
for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30899) FileSystemTableSource with CSV format incorrectly selects fields if filtering for partition

2023-02-04 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-30899:


 Summary: FileSystemTableSource with CSV format incorrectly selects 
fields if filtering for partition
 Key: FLINK-30899
 URL: https://issues.apache.org/jira/browse/FLINK-30899
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.17.0
Reporter: Mate Czagany


In my testing it only affected csv and testcsv formats.

 

I think it's caused by `FileSystemTableSource` calling 
`DeserializationFormatFactory#createRuntimeDecoder` with wrong 
`physicalDataType`. The files won't contain the partitioned field values, but 
in case of a projection pushdown (which can happen during planning phase if we 
filter the partition field by a constant value) the final `physicalDataType` 
passed to the deserializer by `FileSystemTableSource` will contain the 
partitioned fields as well. As described in `DecodingFormat`, every field in 
the `physicalDataType` parameter will have to be present in the serialized 
record.

 

Example:
{code:java}
CREATE TABLE test_table (
  f0 INT,
  f1 INT,
  f2 INT,
  f3 INT
) PARTITIONED BY (f0,f1) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///path/to/whatever',
  'format' = 'csv'
)

SELECT * FROM test_table WHERE f0 = 1;
--  should be 1,4,7,10  
+-+-+-+-+
|          f0 |          f1 |          f2 |          f3 |
+-+-+-+-+
|           1 |           4 |          10 |           0 |
+-+-+-+-+

SELECT * FROM test_table;
+-+-+-+-+
|          f0 |          f1 |          f2 |          f3 |
+-+-+-+-+
|           2 |           5 |           8 |          11 |
|           1 |           4 |           7 |          10 |
|           3 |           6 |           9 |          12 |
+-+-+-+-+

SELECT * FROM test_table WHERE f0>0;
+-+-+-+-+
|          f0 |          f1 |          f2 |          f3 |
+-+-+-+-+
|           1 |           4 |           7 |          10 |
|           3 |           6 |           9 |          12 |
|           2 |           5 |           8 |          11 |
+-+-+-+-+

SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4;
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for 
length 4
    at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49)
    at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27)
    at 
org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101)
    at 
org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92)
    at 
org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
... {code}
At 
[https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147]
 the `physicalRowDataType` will contain the partition fields as well, but 
`partitionKeysToExtract` will not contain it since `producedDataType` has been 
modified in the `applyProjection` method, so it will result in an empty 
projection. Then on line 154 we construct the final `physicalDataType`, but 
since `partitionKeysProjections` is empty, it will result with the same value 
as `physicalDataType` which contains the partition fields too.

By changing
{code:java}
 final Projection partitionKeysProjections = 
Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);{code}
to
{code:java}
 final Projection partitionKeysProjections = 
Projection.fromFieldNames(physicalDataType, partitionKeys);{code}
the issue can be solved. I have verified this solution with 1 and 2 partition 
keys, with and without metadata columns, with and without virtual columns. But 
I still need to test this change with other formats.

 

If this solution seems correct and a committer could assign me to the JIRA I 
can start working on it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30812) YARN with S3 resource storage fails for Hadoop 3.3.2

2023-01-28 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-30812:


 Summary: YARN with S3 resource storage fails for Hadoop 3.3.2
 Key: FLINK-30812
 URL: https://issues.apache.org/jira/browse/FLINK-30812
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.16.0
Reporter: Mate Czagany


In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects the 
local source Hadoop Path object to have a scheme specified which the 
YarnClusterDescriptor uploading the local files won't have.

When uploading files to S3 CopyFromLocalOperation#getFinalPath compares the 
passed source Hadoop Path with the file it found(which will have file:// 
scheme) using URI.relativize but it will fail because of the scheme difference 
and throw PathIOException as can be seen in this exception:

 
{code:java}
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn Application Cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]        ..
Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for 
URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
 Input/output error
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[hadoop-common-3.3.3.jar!/:?]
at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
 ~[hadoop-common-3.3.3.jar!/:?]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]
... 35 more {code}
 

The possibly easiest solution would be to somehow add the file:// scheme in 
YarnApplicationFileUploader#copyToRemoteApplicationDir

The other solution would be to change all calls uploading local files to use 
"new Path(file.toURI())" instead of "new Path(file.getAbsolutePath())" but it 
might not be as future-proof as the other solution

Email thread: [https://lists.apache.org/thread/oo5rlyo3jr7kds2y6wwnfo1yhnk0fx4c]

 

If a committer can assign this ticket to me I can start working on this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)