I've attached below some minimal sample code that reproduces this issue below. This works perfectly with the DirectRunner.
-Pradip #!/usr/bin/env python3 import apache_beam as beam import logging import os class DummyPipeline(beam.PTransform): def expand(self, p): ( p | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub( topic="<valid topic>") | beam.Map(print) ) return p def main(): beam_options = [ # "--runner=DirectRunner", "--runner=FlinkRunner", "--flink_version=1.9", "--flink_submit_uber_jar", "--streaming", '--save_main_session', ] popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options) print(popts) p = beam.Pipeline(options=popts) ( p | "Do It" >> DummyPipeline() ) job = p.run() job.wait_until_finish() if __name__ == "__main__": main() Le dim. 7 juin 2020 à 14:57, Pradip Thachile <pra...@thachile.com> a écrit : > Hey folks, > > I've got a Beam/Python pipeline that works on the DirectRunner and now am > trying to run this on a local dev Flink cluster. Running this yields an > error out the gate around not being able to deserialize UnboundedSource (my > PubSub source). I'm not sure how to debug this and would love to get some > feedback on how to solve this issue. > > Beam SDK: 2.19 > Flink: 1.9.3 > Python: 3.7 > Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', > '--flink_submit_uber_jar', '--streaming'] > (Stacktrace below) > > -Pradip > > [main] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - > ArtifactStagingService started on localhost:55371 > [main] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java > ExpansionService started on localhost:55372 > [main] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - > JobService started on localhost:55364 > [grpc-default-executor-0] INFO > org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job > BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f > with pipeline runner > org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1 > [grpc-default-executor-0] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting > job invocation > BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f > [flink-runner-job-invoker] INFO > org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to > Flink program. > [flink-runner-job-invoker] INFO > org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a > Streaming Environment. > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation > BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f. > java.lang.IllegalArgumentException: unable to deserialize UnboundedSource > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) > at > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) > at > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) > at > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) > ... 14 more > ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5) > [flink-runner-job-invoker] WARN > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - > Failed to remove job staging directory for token > {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}: > {} > java.io.FileNotFoundException: > /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118) > at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82) > at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88) > at > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63) > at > org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Traceback (most recent call last): > File "bin/run-pipeline.py", line 70, in <module> > main() > File "<redacted venv > location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__ > return self.main(*args, **kwargs) > File "<redacted venv > location>/lib/python3.7/site-packages/click/core.py", line 782, in main > rv = self.invoke(ctx) > File "<redacted venv > location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke > return ctx.invoke(self.callback, **ctx.params) > File "<redacted venv > location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke > return callback(*args, **kwargs) > File "bin/run-pipeline.py", line 64, in main > job = runner.run(pipeline=pipeline) > File > "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py", > line 42, in run > result = dag.run() > File "<redacted venv > location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, > in run > return self.runner.run_pipeline(self, self._options) > File "<redacted venv > location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py", > line 47, in run_pipeline > return super(FlinkRunner, self).run_pipeline(pipeline, options) > File "<redacted venv > location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 334, in run_pipeline > result.wait_until_finish() > File "<redacted venv > location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 455, in wait_until_finish > self._job_id, self._state, self._last_error_message())) > RuntimeError: Pipeline > BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f > failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5) >