You are using a proprietary connector which only works on Dataflow. You
will have to use io.external.gcp.pubsub.ReadFromPubsub. PubSub support
is experimental from Python.

-Max

On 09.06.20 06:40, Pradip Thachile wrote:
> Quick update: this test code works just fine on Dataflow as well as the 
> DirectRunner. Looks like the FlinkRunner is problematic for some reason here.
> 
> On 2020/06/08 20:11:13, Pradip Thachile <pra...@thachile.com> wrote: 
>> Hey folks, 
>>
>> I posted this on the Flink user mailing list but didn't get any traction 
>> there (potentially since this is Beam related?). I've got a Beam/Python 
>> pipeline that works on the DirectRunner and now am trying to run this on a 
>> local dev Flink cluster. Running this yields an error out the gate around 
>> not being able to deserialize UnboundedSource (my PubSub source). I'm not 
>> sure how to debug this and would love to get some feedback on how to solve 
>> this issue. I'm also adding in a simple example that reproduces this error.
>>
>> Beam SDK: 2.19
>> Flink: 1.9.3
>> Python: 3.7
>> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
>> '--flink_submit_uber_jar', '--streaming']
>> (Stacktrace below)
>>
>> #!/usr/bin/env python3
>> import apache_beam as beam
>>
>> class DummyPipeline(beam.PTransform):
>>     def expand(self, p):
>>         (
>>             p
>>             | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
>>                 topic="<valid topic>")
>>             | beam.Map(print)
>>         )
>>
>>         return p
>>
>> def main():
>>     beam_options = [
>>         # "--runner=DirectRunner",
>>         "--runner=FlinkRunner",
>>         "--flink_version=1.9",
>>         "--flink_submit_uber_jar",
>>         "--streaming",
>>         '--save_main_session',
>>     ]
>>     popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
>>     p = beam.Pipeline(options=popts)
>>
>>     (
>>         p
>>         | "Do It" >> DummyPipeline()
>>     )
>>     job = p.run()
>>     job.wait_until_finish()
>>
>> if __name__ == "__main__":
>>     main()
>>
>> -Pradip
>>
>> [main] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
>> ArtifactStagingService started on localhost:55371
>> [main] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java 
>> ExpansionService started on localhost:55372
>> [main] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
>> JobService started on localhost:55364
>> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker 
>> - Invoking job 
>> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f 
>> with pipeline runner 
>> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
>> [grpc-default-executor-0] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting 
>> job invocation 
>> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
>> [flink-runner-job-invoker] INFO 
>> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
>> Flink program.
>> [flink-runner-job-invoker] INFO 
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a 
>> Streaming Environment.
>> [flink-runner-job-invoker] ERROR 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
>> during job invocation 
>> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
>> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
>>     at 
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>     at 
>> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>>     at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>>     at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>>     at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>>     at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>>     at 
>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>>     at 
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>>     at 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>>     at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>>     at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>>     at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>>     at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>>     at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>>     at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>>     at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>>     at 
>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>>     at 
>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>>     at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>>     at 
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>>     ... 14 more
>> ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>> [flink-runner-job-invoker] WARN 
>> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - 
>> Failed to remove job staging directory for token 
>> {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}:
>>  {}
>> java.io.FileNotFoundException: 
>> /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST
>>  (No such file or directory)
>>     at java.io.FileInputStream.open0(Native Method)
>>     at java.io.FileInputStream.open(FileInputStream.java:195)
>>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
>>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
>>     at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>     at 
>> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
>>     at 
>> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
>>     at 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
>>     at 
>> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
>>     at 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
>>     at 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
>>     at 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
>>     at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Traceback (most recent call last):
>>   File "bin/run-pipeline.py", line 70, in <module>
>>     main()
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
>> line 829, in __call__
>>     return self.main(*args, **kwargs)
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
>> line 782, in main
>>     rv = self.invoke(ctx)
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
>> line 1066, in invoke
>>     return ctx.invoke(self.callback, **ctx.params)
>>   File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", 
>> line 610, in invoke
>>     return callback(*args, **kwargs)
>>   File "bin/run-pipeline.py", line 64, in main
>>     job = runner.run(pipeline=pipeline)
>>   File 
>> "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py",
>>  line 42, in run
>>     result = dag.run()
>>   File "<redacted venv 
>> location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in 
>> run
>>     return self.runner.run_pipeline(self, self._options)
>>   File "<redacted venv 
>> location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py",
>>  line 47, in run_pipeline
>>     return super(FlinkRunner, self).run_pipeline(pipeline, options)
>>   File "<redacted venv 
>> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>  line 334, in run_pipeline
>>     result.wait_until_finish()
>>   File "<redacted venv 
>> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>>  line 455, in wait_until_finish
>>     self._job_id, self._state, self._last_error_message()))
>> RuntimeError: Pipeline 
>> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f 
>> failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>>
>>

Reply via email to