I've attached below some minimal sample code that reproduces this issue
below. This works perfectly with the DirectRunner.

-Pradip

#!/usr/bin/env python3
import apache_beam as beam
import logging
import os

class DummyPipeline(beam.PTransform):
    def expand(self, p):
        (
            p
            | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
                topic="<valid topic>")
            | beam.Map(print)
        )

        return p

def main():
    beam_options = [
        # "--runner=DirectRunner",
        "--runner=FlinkRunner",
        "--flink_version=1.9",
        "--flink_submit_uber_jar",
        "--streaming",
        '--save_main_session',
    ]
    popts =
beam.options.pipeline_options.PipelineOptions(flags=beam_options)
    print(popts)
    p = beam.Pipeline(options=popts)

    (
        p
        | "Do It" >> DummyPipeline()
    )
    job = p.run()
    job.wait_until_finish()

if __name__ == "__main__":
    main()

Le dim. 7 juin 2020 à 14:57, Pradip Thachile <pra...@thachile.com> a écrit :

> Hey folks,
>
> I've got a Beam/Python pipeline that works on the DirectRunner and now am
> trying to run this on a local dev Flink cluster. Running this yields an
> error out the gate around not being able to deserialize UnboundedSource (my
> PubSub source). I'm not sure how to debug this and would love to get some
> feedback on how to solve this issue.
>
> Beam SDK: 2.19
> Flink: 1.9.3
> Python: 3.7
> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9',
> '--flink_submit_uber_jar', '--streaming']
> (Stacktrace below)
>
> -Pradip
>
> [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
> ArtifactStagingService started on localhost:55371
> [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
> ExpansionService started on localhost:55372
> [main] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
> JobService started on localhost:55364
> [grpc-default-executor-0] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
> with pipeline runner
> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
> [grpc-default-executor-0] INFO
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting
> job invocation
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
> [flink-runner-job-invoker] INFO
> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
> Flink program.
> [flink-runner-job-invoker] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-invoker] ERROR
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error
> during job invocation
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f.
> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
>     at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>     at
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>     at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>     at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>     at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>     at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>     at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>     at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>     at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>     at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>     at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>     at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>     at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>     at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>     at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>     at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>     at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>     ... 14 more
> ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> [flink-runner-job-invoker] WARN
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService -
> Failed to remove job staging directory for token
> {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}:
> {}
> java.io.FileNotFoundException:
> /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST
> (No such file or directory)
>     at java.io.FileInputStream.open0(Native Method)
>     at java.io.FileInputStream.open(FileInputStream.java:195)
>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>     at
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
>     at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
>     at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>     at
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88)
>     at
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92)
>     at
> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63)
>     at
> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201)
>     at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247)
>     at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48)
>     at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151)
>     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Traceback (most recent call last):
>   File "bin/run-pipeline.py", line 70, in <module>
>     main()
>   File "<redacted venv
> location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__
>     return self.main(*args, **kwargs)
>   File "<redacted venv
> location>/lib/python3.7/site-packages/click/core.py", line 782, in main
>     rv = self.invoke(ctx)
>   File "<redacted venv
> location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
>     return ctx.invoke(self.callback, **ctx.params)
>   File "<redacted venv
> location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke
>     return callback(*args, **kwargs)
>   File "bin/run-pipeline.py", line 64, in main
>     job = runner.run(pipeline=pipeline)
>   File
> "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py",
> line 42, in run
>     result = dag.run()
>   File "<redacted venv
> location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474,
> in run
>     return self.runner.run_pipeline(self, self._options)
>   File "<redacted venv
> location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py",
> line 47, in run_pipeline
>     return super(FlinkRunner, self).run_pipeline(pipeline, options)
>   File "<redacted venv
> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 334, in run_pipeline
>     result.wait_until_finish()
>   File "<redacted venv
> location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 455, in wait_until_finish
>     self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f
> failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>

Reply via email to