The additional exceptions with the same error but on different files Pyflink lib error :
java.lang.RuntimeException: An error occurred while copying the file. at org.apache.flink.api.common.cache.DistributedCache.getFile( DistributedCache.java:158) at org.apache.flink.python.env.PythonDependencyInfo.create( PythonDependencyInfo.java:151) at org.apache.flink.streaming.api.operators.python.process. AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager( AbstractExternalPythonFunctionOperator.java:124) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamAggregateOperator.createPythonFunctionRunner( AbstractPythonStreamAggregateOperator.java:176) at org.apache.flink.streaming.api.operators.python.process. AbstractExternalPythonFunctionOperator.open( AbstractExternalPythonFunctionOperator.java:56) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamAggregateOperator.open( AbstractPythonStreamAggregateOperator.java:160) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamGroupAggregateOperator.open( AbstractPythonStreamGroupAggregateOperator.java:116) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain .initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call( StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask .java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.nio.file.FileAlreadyExistsException: File already exists: /tmp/flink-dist-cache-6c6899b4-694e-43b7-a081-f58ade99f212/9572e aa67fa026c8cfa1ebd5435a5c29/plugin_directory/venv/lib/python3.8 /site-packages/pyflink/lib/flink-json-1.17.0.jar at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem .java:257) at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536) at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( FileCache.java:289) at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( FileCache.java:261) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors .java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent. ScheduledThreadPoolExecutor$ScheduledFutureTask.run( ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628) ... 1 more Cpython file error: java.lang.RuntimeException: An error occurred while copying the file. at org.apache.flink.api.common.cache.DistributedCache.getFile( DistributedCache.java:158) at org.apache.flink.python.env.PythonDependencyInfo.create( PythonDependencyInfo.java:151) at org.apache.flink.streaming.api.operators.python.process. AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager( AbstractExternalPythonFunctionOperator.java:124) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamAggregateOperator.createPythonFunctionRunner( AbstractPythonStreamAggregateOperator.java:176) at org.apache.flink.streaming.api.operators.python.process. AbstractExternalPythonFunctionOperator.open( AbstractExternalPythonFunctionOperator.java:56) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamAggregateOperator.open( AbstractPythonStreamAggregateOperator.java:160) at org.apache.flink.table.runtime.operators.python.aggregate. AbstractPythonStreamGroupAggregateOperator.open( AbstractPythonStreamGroupAggregateOperator.java:116) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain .initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call( StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask .java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.nio.file.FileAlreadyExistsException: File already exists: /tmp/flink-dist-cache-ccec7705-40e9-4c8f-952b-1b26f58fd517/9572e aa67fa026c8cfa1ebd5435a5c29/plugin_directory/venv/lib/python3.8 /site-packages/numpy/__pycache__/__config__.cpython-38.pyc at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem .java:257) at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536) at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( FileCache.java:289) at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( FileCache.java:261) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors .java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent. ScheduledThreadPoolExecutor$ScheduledFutureTask.run( ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628) ... 1 more On Mon, Oct 23, 2023 at 11:47 AM Ashish Khatkar <akhat...@yelp.com> wrote: > Hi, > > We are using flink-1.17.0 table API and RocksDB as backend to provide a > service to our users to run sql queries. The tables are created using the > avro schema and we also provide users to attach python udf as a plugin. > This plugin is downloaded at the time of building the table and we update > the StreamTableEnvironment with the python.files, python.archives, > python.client.executable and python.executable. > > We use restart policies in case an unexpected failure happens. We are > recently observing issues where the job fails to recover with error > >> Caused by: java.nio.file.FileAlreadyExistsException: File already >> exists: /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e >> aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip > > and the only way to recover for us is to restart the cluster. > > Does anyone have any idea on how to fix this? > This is the full stacktrace > > java.lang.RuntimeException: An error occurred while copying the file. > at org.apache.flink.api.common.cache.DistributedCache.getFile( > DistributedCache.java:158) > at org.apache.flink.python.env.PythonDependencyInfo.create( > PythonDependencyInfo.java:151) > at org.apache.flink.streaming.api.operators.python.process. > AbstractExternalPythonFunctionOperator.createPythonEnvironmentManager( > AbstractExternalPythonFunctionOperator.java:124) > at org.apache.flink.table.runtime.operators.python.aggregate. > AbstractPythonStreamAggregateOperator.createPythonFunctionRunner( > AbstractPythonStreamAggregateOperator.java:176) > at org.apache.flink.streaming.api.operators.python.process. > AbstractExternalPythonFunctionOperator.open( > AbstractExternalPythonFunctionOperator.java:56) > at org.apache.flink.table.runtime.operators.python.aggregate. > AbstractPythonStreamAggregateOperator.open( > AbstractPythonStreamAggregateOperator.java:160) > at org.apache.flink.table.runtime.operators.python.aggregate. > AbstractPythonStreamGroupAggregateOperator.open( > AbstractPythonStreamGroupAggregateOperator.java:116) > at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain > .initializeStateAndOpenOperators(RegularOperatorChain.java:107) > at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( > StreamTask.java:734) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .call(StreamTaskActionExecutor.java:55) > at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( > StreamTask.java:709) > at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask > .java:675) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:952) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.nio.file.FileAlreadyExistsException: File already exists: > /tmp/flink-dist-cache-8bad2720-fd10-4511-9c80-cbb3ceb86bbf/9572e > aa67fa026c8cfa1ebd5435a5c29/plugin_directory/plugin_name.zip > at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem > .java:257) > at org.apache.flink.util.FileUtils.expandDirectory(FileUtils.java:536) > at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( > FileCache.java:289) > at org.apache.flink.runtime.filecache.FileCache$CopyFromBlobProcess.call( > FileCache.java:261) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors > .java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.util.concurrent. > ScheduledThreadPoolExecutor$ScheduledFutureTask.run( > ScheduledThreadPoolExecutor.java:304) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1128) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:628) > ... 1 more >