我在尝试提交pyflink作业到k8s,按照这篇文章[1]介绍操作的,pyflink镜像文件[2],flink版本是1.15.2,执行wordcount jar作业没遇到问题,而在提交pyflink作业时发现作业失败了,日志显示如下。我本地安装的python 3.7.9和pyflink镜像中的版本是一致的, 请问是不是pickle包版本有问题? 怎样查看当前pickle包版本号是多少? 期望用的pickle包版本号是多少? 如何将当前pickle包安装成期望的版本?
./bin/flink run \ -m localhost:8081 \ -py ./examples/python/table/word_count.py 2023-04-1516:52:27 org.apache.flink.runtime.taskmanager.AsynchronousException: Caughtexceptionwhile processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(UnknownSource) Causedby: TimerException{java.lang.RuntimeException: Errorwhile waiting forBeamPythonFunctionRunner flush} ... 14 more Causedby: java.lang.RuntimeException: Errorwhile waiting forBeamPythonFunctionRunner flush at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648) ... 13 more Causedby: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366) at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(UnknownSource) at java.base/java.util.concurrent.FutureTask.run(UnknownSource) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(UnknownSource) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(UnknownSource) ... 1 more Causedby: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received fromSDK harness for instruction 1: Traceback (most recent call last): File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle instruction_id, request.process_bundle_descriptor_id) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, inget self.data_channel_factory) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp> get_operation(transform_id))) for transform_id in sorted( File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation transform_id, transform_consumers) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 58, in create_table_function table_operations.TableFunctionOperation) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 206, in _create_user_defined_function_operation internal_operation_cls) File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 131, in __init__ super(TableFunctionOperation, self).__init__(serialized_fn) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__ self.func, self.user_defined_funcs = self.generate_func(serialized_fn) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 141, in generate_func operation_utils.extract_user_defined_function(serialized_fn.udfs[0]) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 143, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at java.base/java.util.concurrent.CompletableFuture.reportGet(UnknownSource) at java.base/java.util.concurrent.CompletableFuture.get(UnknownSource) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380) ... 7 more Causedby: java.lang.RuntimeException: Error received fromSDK harness for instruction 1: Traceback (most recent call last): File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle instruction_id, request.process_bundle_descriptor_id) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, inget self.data_channel_factory) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp> get_operation(transform_id))) for transform_id in sorted( File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation transform_id, transform_consumers) File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 58, in create_table_function table_operations.TableFunctionOperation) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 206, in _create_user_defined_function_operation internal_operation_cls) File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 131, in __init__ super(TableFunctionOperation, self).__init__(serialized_fn) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__ self.func, self.user_defined_funcs = self.generate_func(serialized_fn) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 141, in generate_func operation_utils.extract_user_defined_function(serialized_fn.udfs[0]) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 143, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ... 3 more [1] https://blog.devgenius.io/playing-pyflink-from-scratch-65c18908c366 [2] https://hub.docker.com/r/wirelessr/pyflink