You are using a proprietary connector which only works on Dataflow. You will have to use io.external.gcp.pubsub.ReadFromPubsub. PubSub support is experimental from Python.
-Max On 09.06.20 06:40, Pradip Thachile wrote: > Quick update: this test code works just fine on Dataflow as well as the > DirectRunner. Looks like the FlinkRunner is problematic for some reason here. > > On 2020/06/08 20:11:13, Pradip Thachile <pra...@thachile.com> wrote: >> Hey folks, >> >> I posted this on the Flink user mailing list but didn't get any traction >> there (potentially since this is Beam related?). I've got a Beam/Python >> pipeline that works on the DirectRunner and now am trying to run this on a >> local dev Flink cluster. Running this yields an error out the gate around >> not being able to deserialize UnboundedSource (my PubSub source). I'm not >> sure how to debug this and would love to get some feedback on how to solve >> this issue. I'm also adding in a simple example that reproduces this error. >> >> Beam SDK: 2.19 >> Flink: 1.9.3 >> Python: 3.7 >> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', >> '--flink_submit_uber_jar', '--streaming'] >> (Stacktrace below) >> >> #!/usr/bin/env python3 >> import apache_beam as beam >> >> class DummyPipeline(beam.PTransform): >> def expand(self, p): >> ( >> p >> | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub( >> topic="<valid topic>") >> | beam.Map(print) >> ) >> >> return p >> >> def main(): >> beam_options = [ >> # "--runner=DirectRunner", >> "--runner=FlinkRunner", >> "--flink_version=1.9", >> "--flink_submit_uber_jar", >> "--streaming", >> '--save_main_session', >> ] >> popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options) >> p = beam.Pipeline(options=popts) >> >> ( >> p >> | "Do It" >> DummyPipeline() >> ) >> job = p.run() >> job.wait_until_finish() >> >> if __name__ == "__main__": >> main() >> >> -Pradip >> >> [main] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - >> ArtifactStagingService started on localhost:55371 >> [main] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java >> ExpansionService started on localhost:55372 >> [main] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - >> JobService started on localhost:55364 >> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker >> - Invoking job >> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f >> with pipeline runner >> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1 >> [grpc-default-executor-0] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting >> job invocation >> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f >> [flink-runner-job-invoker] INFO >> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to >> Flink program. >> [flink-runner-job-invoker] INFO >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a >> Streaming Environment. >> [flink-runner-job-invoker] ERROR >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error >> during job invocation >> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f. >> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> at >> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) >> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) >> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) >> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) >> at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) >> at >> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) >> at >> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) >> at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) >> ... 14 more >> ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5) >> [flink-runner-job-invoker] WARN >> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - >> Failed to remove job staging directory for token >> {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}: >> {} >> java.io.FileNotFoundException: >> /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST >> (No such file or directory) >> at java.io.FileInputStream.open0(Native Method) >> at java.io.FileInputStream.open(FileInputStream.java:195) >> at java.io.FileInputStream.<init>(FileInputStream.java:138) >> at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118) >> at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82) >> at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >> at >> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88) >> at >> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Traceback (most recent call last): >> File "bin/run-pipeline.py", line 70, in <module> >> main() >> File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", >> line 829, in __call__ >> return self.main(*args, **kwargs) >> File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", >> line 782, in main >> rv = self.invoke(ctx) >> File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", >> line 1066, in invoke >> return ctx.invoke(self.callback, **ctx.params) >> File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", >> line 610, in invoke >> return callback(*args, **kwargs) >> File "bin/run-pipeline.py", line 64, in main >> job = runner.run(pipeline=pipeline) >> File >> "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py", >> line 42, in run >> result = dag.run() >> File "<redacted venv >> location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in >> run >> return self.runner.run_pipeline(self, self._options) >> File "<redacted venv >> location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py", >> line 47, in run_pipeline >> return super(FlinkRunner, self).run_pipeline(pipeline, options) >> File "<redacted venv >> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", >> line 334, in run_pipeline >> result.wait_until_finish() >> File "<redacted venv >> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", >> line 455, in wait_until_finish >> self._job_id, self._state, self._last_error_message())) >> RuntimeError: Pipeline >> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f >> failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5) >> >>