Try adding "--experiments=beam_fn_api" to your pipeline options. (This is a
known issue with Beam 2.15 that will be fixed in 2.16.)

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe <yu.w.ten...@gmail.com> wrote:

> Hello.
>
> I am trying to spin up the flink runner but looks like data serialization
> is failing.
> I would like to ask for help to get over with this error.
>
> ========================================================================
> [flink-runner-job-invoker] ERROR
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error
> during job invocation
> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
> java.lang.IllegalArgumentException: unable to deserialize BoundedSource
>         at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>         at
> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>         at
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>         at
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>         at
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>         at
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
> ywatanabe@debian-09-00:~$
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>         at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>         at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>         at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>         at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>         at
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>         at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>         ... 13 more
> ========================================================================
>
> My beam version is below.
>
> =======================================================================
> (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep apache-beam
> apache-beam==2.15.0
> =======================================================================
>
> I have my harness container ready on  the registry.
>
> =======================================================================
> ywatanabe@debian-09-00:~$ docker search
> ywatanabe-docker-apache.bintray.io/python3
> NAME                DESCRIPTION         STARS               OFFICIAL
>      AUTOMATED
> beam/python3                            0
> =======================================================================
>
> Flink is ready on separate cluster.
>
> =======================================================================
> (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
> tcp    LISTEN     0      128      :::8081                 :::*
> =======================================================================
>
> My debian version.
>
> =======================================================================
> (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
> 9.11
> =======================================================================
>
> My code snippet is below.
>
> =======================================================================
>     options = PipelineOptions([
>                   "--runner=FlinkRunner",
>                   "--flink_version=1.8",
>                   "--flink_master_url=localhost:8081"
>               ])
>
>     with beam.Pipeline(options=options) as p:
>
>         (p | beam.Create(["Hello World"]))
> =======================================================================
>
> Would there be any other settings should I look for ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
> Twitter icon] <https://twitter.com/yuwtennis>
>

Reply via email to