Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-25 Thread Maxim Senin via user
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user 
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user 
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event 
processing ExecutionScope{ resource id: 
ResourceID{name='f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'}, 
version: 50606957} failed.

Re: CSV format and hdfs

2024-04-25 Thread Robert Young
Hi Artem,

I had a debug of Flink 1.17.1 (running CsvFilesystemBatchITCase) and I see
the same behaviour. It's the same on master too. Jackson flushes [1] the
underlying stream after every `writeValue` call. I experimented with
disabling the flush by disabling Jackson's FLUSH_PASSED_TO_STREAM [2]
feature but this broke the Integration tests. This is because Jackson wraps
the stream in it's own Writer that buffers data. We depend on the flush to
flush the jackson writer and eventually write the bytes to the stream.

One workaround I found [3] is to wrap the stream in an implementation that
ignores flush calls, and pass that to Jackson. So Jackson will flush it's
writer buffers and write the bytes to the underlying stream, then try to
flush the underlying stream but it will be a No-Op. The CsvBulkWriter will
continues to flush/sync the underlying stream. Unfortunately this required
code changes in Flink CSV so might not be helpful for you.

1.
https://github.com/FasterXML/jackson-dataformats-text/blob/8700b5489090f81b4b8d2636f9298ac47dbf14a3/csv/src/main/java/com/fasterxml/jackson/dataformat/csv/CsvGenerator.java#L504
2.
https://fasterxml.github.io/jackson-core/javadoc/2.13/com/fasterxml/jackson/core/JsonGenerator.Feature.html#FLUSH_PASSED_TO_STREAM
3.
https://github.com/robobario/flink/commit/ae3fdb1ca9de748df791af232bba57d6d7289a79

Rob Young


Best serialization performance for `Objects`

2024-04-25 Thread Salva Alcántara
I know that an `Object` is treated as a generic data type by Flink and
hence serialized using Kryo. I wonder if there is anything one can do to
improve performance w.r.t. to the Kryo-based serializer or if that is
simply an inherent worst case scenario and nothing can be done without
actually switching to another (more specific, well-supported) data type,
e.g., `String`.

Regards,

Salva


Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-25 Thread Maxim Senin via user
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event 
processing ExecutionScope{ resource id: 
ResourceID{name='f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'}, 
version: 50606957} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED",

Caused by: 
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:63)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:279)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:156)
at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:171)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145)
... 13 more

How to fix this? Why is the deployment not coming back up after this exception? 
Is there an configuration property to set a number of retires?

Thanks,
Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Flink SQL Client does not start job with savepoint

2024-04-25 Thread Lee, Keith
Hi,

Referring to 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint

I’ve followed the instruction however I do not see evidence of the job being 
started with savepoint. See SQL statements excerpt below:


Flink SQL> STOP JOB '14de8cc898d56653b96872fc0ba03c91' WITH SAVEPOINT;

+--+

|   savepoint path |

+--+

| file:/tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc |

+--+

…

Flink SQL> CREATE TABLE Orders (order_number BIGINT,price DECIMAL(32,2),buyer 
ROW,order_time TIMESTAMP(3)) WITH 
('connector' = 'datagen');

[INFO] Execute statement succeed.



Flink SQL> CREATE TABLE OrdersBlackhole (order_number BIGINT,price 
DECIMAL(32,2),buyer ROW,order_time 
TIMESTAMP(3)) WITH ('connector' = 'blackhole');

[INFO] Execute statement succeed.



Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;

[INFO] Submitting SQL update statement to the cluster...

[INFO] SQL update statement has been successfully submitted to the cluster:

Job ID: 6969725a69ecc967aac2ce3eedcc274a





Flink SQL> STOP JOB '6969725a69ecc967aac2ce3eedcc274a';

[INFO] Execute statement succeed.



Flink SQL> SET 'execution.state-recovery.path' = 
'file:///tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc';

[INFO] Execute statement succeed.



Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;

[INFO] Submitting SQL update statement to the cluster...

[INFO] SQL update statement has been successfully submitted to the cluster:

Job ID: 7881d53d28751f9bbbd3581976d9fe3d



I have attempted with and without the prefix file:// and file:/. Additionally, 
I’ve also attempted the following in config.yml


state.savepoints.dir: file:///tmp/flink-savepoints/
state.checkpoints.dir: file:///tmp/flink-checkpoints/

Am I missing something? The jobmanager log did not indicate a start from 
savepoint.



Received JobGraph submission 
'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
Submitting job 'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
JobMasterServiceLeadershipRunner for job 6969725a69ecc967aac2ce3eedcc274a was 
granted leadership with leader id ----. 
Creating new JobMasterServiceProcess.
Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
pekko://flink/user/rpc/jobmanager_4 .
Initializing job 'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
Using restart back off time strategy NoRestartBackoffTimeStrategy for 
insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Created execution graph 9905f321e9958b6c36b71e0601a85a59 for job 
6969725a69ecc967aac2ce3eedcc274a.
Running initialization on master for job 
insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Successfully ran initialization on master in 0 ms.
Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
State backend is set to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@78e93599
State backend loader loads the state backend as HashMapStateBackend
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@acb26a25
No checkpoint found during restore.
Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy@7db68f8f
 for insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Starting execution of job 
'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a) under job master id 
.
Starting scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
Job insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a) switched from state CREATED to RUNNING.



Thanks in advance,
Keith



Re: Flink SQL Client does not start job with savepoint

2024-04-25 Thread Lee, Keith
Apologies, I have included the jobmanager log for 
6969725a69ecc967aac2ce3eedcc274a  instead of 7881d53d28751f9bbbd3581976d9fe3d, 
however they looked exactly the same.

Can include if necessary.

Thanks
Keith

From: "Lee, Keith" 
Date: Thursday, 25 April 2024 at 21:41
To: "user@flink.apache.org" 
Subject: Flink SQL Client does not start job with savepoint

Hi,

Referring to 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint

I’ve followed the instruction however I do not see evidence of the job being 
started with savepoint. See SQL statements excerpt below:


Flink SQL> STOP JOB '14de8cc898d56653b96872fc0ba03c91' WITH SAVEPOINT;

+--+

|   savepoint path |

+--+

| file:/tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc |

+--+

…

Flink SQL> CREATE TABLE Orders (order_number BIGINT,price DECIMAL(32,2),buyer 
ROW,order_time TIMESTAMP(3)) WITH 
('connector' = 'datagen');

[INFO] Execute statement succeed.



Flink SQL> CREATE TABLE OrdersBlackhole (order_number BIGINT,price 
DECIMAL(32,2),buyer ROW,order_time 
TIMESTAMP(3)) WITH ('connector' = 'blackhole');

[INFO] Execute statement succeed.



Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;

[INFO] Submitting SQL update statement to the cluster...

[INFO] SQL update statement has been successfully submitted to the cluster:

Job ID: 6969725a69ecc967aac2ce3eedcc274a





Flink SQL> STOP JOB '6969725a69ecc967aac2ce3eedcc274a';

[INFO] Execute statement succeed.



Flink SQL> SET 'execution.state-recovery.path' = 
'file:///tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc';

[INFO] Execute statement succeed.



Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;

[INFO] Submitting SQL update statement to the cluster...

[INFO] SQL update statement has been successfully submitted to the cluster:

Job ID: 7881d53d28751f9bbbd3581976d9fe3d



I have attempted with and without the prefix file:// and file:/. Additionally, 
I’ve also attempted the following in config.yml

state.savepoints.dir: file:///tmp/flink-savepoints/
state.checkpoints.dir: file:///tmp/flink-checkpoints/

Am I missing something? The jobmanager log did not indicate a start from 
savepoint.


Received JobGraph submission 
'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
Submitting job 'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
JobMasterServiceLeadershipRunner for job 6969725a69ecc967aac2ce3eedcc274a was 
granted leadership with leader id ----. 
Creating new JobMasterServiceProcess.
Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
pekko://flink/user/rpc/jobmanager_4 .
Initializing job 'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a).
Using restart back off time strategy NoRestartBackoffTimeStrategy for 
insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Created execution graph 9905f321e9958b6c36b71e0601a85a59 for job 
6969725a69ecc967aac2ce3eedcc274a.
Running initialization on master for job 
insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Successfully ran initialization on master in 0 ms.
Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
State backend is set to heap memory 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@78e93599
State backend loader loads the state backend as HashMapStateBackend
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@acb26a25
No checkpoint found during restore.
Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy@7db68f8f
 for insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a).
Starting execution of job 
'insert-into_default_catalog.default_database.OrdersBlackhole' 
(6969725a69ecc967aac2ce3eedcc274a) under job master id 
.
Starting scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
Job insert-into_default_catalog.default_database.OrdersBlackhole 
(6969725a69ecc967aac2ce3eedcc274a) switched from state CREATED to RUNNING.


Thanks in advance,
Keith