The additional exceptions with the same error but on different files

Pyflink lib error :

java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(
DistributedCache.java:158)
at org.apache.flink.python.env.PythonDependencyInfo.create(
PythonDependencyInfo.java:151)
at org.apache.flink.streaming.api.operators.python.process.
AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager(
AbstractExternalPythonFunctionOperator.java:124)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamAggregateOperator.createPythonFunctionRunner(
AbstractPythonStreamAggregateOperator.java:176)
at org.apache.flink.streaming.api.operators.python.process.
AbstractExternalPythonFunctionOperator.open(
AbstractExternalPythonFunctionOperator.java:56)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamAggregateOperator.open(
AbstractPythonStreamAggregateOperator.java:160)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamGroupAggregateOperator.open(
AbstractPythonStreamGroupAggregateOperator.java:116)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:734)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(
StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:709)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
.java:675)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.file.FileAlreadyExistsException: File already exists:
/tmp/flink-dist-cache-6c6899b4-694e-43b7-a081-f58ade99f212/9572e
aa67fa026c8cfa1ebd5435a5c29/plugin_directory/venv/lib/python3.8
/site-packages/pyflink/lib/flink-json-1.17.0.jar
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem
.java:257)
at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536)
at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
FileCache.java:289)
at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
FileCache.java:261)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.
ScheduledThreadPoolExecutor$ScheduledFutureTask.run(
ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:628)
... 1 more



Cpython file error:

java.lang.RuntimeException: An error occurred while copying the file.
at org.apache.flink.api.common.cache.DistributedCache.getFile(
DistributedCache.java:158)
at org.apache.flink.python.env.PythonDependencyInfo.create(
PythonDependencyInfo.java:151)
at org.apache.flink.streaming.api.operators.python.process.
AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager(
AbstractExternalPythonFunctionOperator.java:124)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamAggregateOperator.createPythonFunctionRunner(
AbstractPythonStreamAggregateOperator.java:176)
at org.apache.flink.streaming.api.operators.python.process.
AbstractExternalPythonFunctionOperator.open(
AbstractExternalPythonFunctionOperator.java:56)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamAggregateOperator.open(
AbstractPythonStreamAggregateOperator.java:160)
at org.apache.flink.table.runtime.operators.python.aggregate.
AbstractPythonStreamGroupAggregateOperator.open(
AbstractPythonStreamGroupAggregateOperator.java:116)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:734)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(
StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:709)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
.java:675)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.nio.file.FileAlreadyExistsException: File already exists:
/tmp/flink-dist-cache-ccec7705-40e9-4c8f-952b-1b26f58fd517/9572e
aa67fa026c8cfa1ebd5435a5c29/plugin_directory/venv/lib/python3.8
/site-packages/numpy/__pycache__/__config__.cpython-38.pyc
at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem
.java:257)
at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536)
at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
FileCache.java:289)
at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
FileCache.java:261)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.
ScheduledThreadPoolExecutor$ScheduledFutureTask.run(
ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:628)
... 1 more


On Mon, Oct 23, 2023 at 11:47 AM Ashish Khatkar <akhat...@yelp.com> wrote:

> Hi,
>
> We are using flink-1.17.0 table API and RocksDB as backend to provide a
> service to our users to run sql queries. The tables are created using the
> avro schema and we also provide users to attach python udf as a plugin.
> This plugin is downloaded at the time of building the table and we update
> the StreamTableEnvironment with the python.files, python.archives,
> python.client.executable and python.executable.
>
> We use restart policies in case an unexpected failure happens. We are
> recently observing issues where the job fails to recover with error
>
>> Caused by: java.nio.file.FileAlreadyExistsException: File already
>> exists: /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e
>> aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip
>
>  and the only way to recover for us is to restart the cluster.
>
> Does anyone have any idea on how to fix this?
> This is the full stacktrace
>
> java.lang.RuntimeException: An error occurred while copying the file.
> at org.apache.flink.api.common.cache.DistributedCache.getFile(
> DistributedCache.java:158)
> at org.apache.flink.python.env.PythonDependencyInfo.create(
> PythonDependencyInfo.java:151)
> at org.apache.flink.streaming.api.operators.python.process.
> AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager(
> AbstractExternalPythonFunctionOperator.java:124)
> at org.apache.flink.table.runtime.operators.python.aggregate.
> AbstractPythonStreamAggregateOperator.createPythonFunctionRunner(
> AbstractPythonStreamAggregateOperator.java:176)
> at org.apache.flink.streaming.api.operators.python.process.
> AbstractExternalPythonFunctionOperator.open(
> AbstractExternalPythonFunctionOperator.java:56)
> at org.apache.flink.table.runtime.operators.python.aggregate.
> AbstractPythonStreamAggregateOperator.open(
> AbstractPythonStreamAggregateOperator.java:160)
> at org.apache.flink.table.runtime.operators.python.aggregate.
> AbstractPythonStreamGroupAggregateOperator.open(
> AbstractPythonStreamGroupAggregateOperator.java:116)
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
> .initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
> StreamTask.java:734)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .call(StreamTaskActionExecutor.java:55)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
> StreamTask.java:709)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
> .java:675)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:952)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 921)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.nio.file.FileAlreadyExistsException: File already exists:
> /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e
> aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip
> at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem
> .java:257)
> at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536)
> at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
> FileCache.java:289)
> at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call(
> FileCache.java:261)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
> .java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.util.concurrent.
> ScheduledThreadPoolExecutor$ScheduledFutureTask.run(
> ScheduledThreadPoolExecutor.java:304)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:628)
> ... 1 more
>

Reply via email to