On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org> wrote:

> > Is not this flag set automatically for the portable runner
>
> Yes, the flag is set automatically, but it has been broken before and
> likely will be again. It just adds additional complexity to portable
> Runners. There is no other portability API then the Fn API. This flag
> historically had its justification, but seems obsolete now.
>

I disagree that this flag is obsolete. It is still serving a purpose for
batch users using dataflow runner and that is decent chunk of beam python
users.

I agree with switching the default. I would like to give enough time to
decouple the flag from the core code. (With a quick search I saw two
instances related to Read and Create.) Have time to test changes and then
switch the default.


>
> An isinstance check might be smarter, but does not get rid of the root
> of the problem.
>

I might be wrong, IIUC, it will temporarily resolve the reported issues. Is
this not accurate?


>
> -Max
>
> On 17.09.19 14:20, Ahmet Altay wrote:
> > Could you make that change and see if it would have addressed the issue
> > here?
> >
> > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com
> > <mailto:kcwea...@google.com>> wrote:
> >
> >     The flag is automatically set, but not in a smart way. Taking
> >     another look at the code, a more resilient fix would be to just
> >     check if the runner isinstance of PortableRunner.
> >
> >     Kyle Weaver | Software Engineer | github.com/ibzib
> >     <http://github.com/ibzib> | kcwea...@google.com
> >     <mailto:kcwea...@google.com>
> >
> >
> >     On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com
> >     <mailto:al...@google.com>> wrote:
> >
> >         Is not this flag set automatically for the portable runner here
> >         [1] ?
> >
> >         [1]
> >
> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
> >
> >         On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
> >         <rober...@google.com <mailto:rober...@google.com>> wrote:
> >
> >             On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <t...@apache.org
> >             <mailto:t...@apache.org>> wrote:
> >              >
> >              > +1 for making --experiments=beam_fn_api default.
> >              >
> >              > Can the Dataflow runner driver just remove the setting if
> >             it is not compatible?
> >
> >             The tricky bit would be undoing the differences in graph
> >             construction
> >             due to this flag flip. But I would be in favor of changing
> >             the default
> >             (probably just removing the flag) and moving the
> >             non-portability parts
> >             into the dataflow runner itself. (It looks like the key
> >             differences
> >             here are for the Create and Read transforms.)
> >
> >              > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels
> >             <m...@apache.org <mailto:m...@apache.org>> wrote:
> >              >>
> >              >> +dev
> >              >>
> >              >> The beam_fn_api flag and the way it is automatically set
> >             is error-prone.
> >              >> Is there anything that prevents us from removing it? I
> >             understand that
> >              >> some Runners, e.g. Dataflow Runner have two modes of
> >             executing Python
> >              >> pipelines (legacy and portable), but at this point it
> >             seems clear that
> >              >> the portability mode should be the default.
> >              >>
> >              >> Cheers,
> >              >> Max
> >              >>
> >              >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
> >              >> <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
> >             wrote:
> >              >>
> >              >>     Kyle
> >              >>
> >              >>     Thank you for the assistance.
> >              >>
> >              >>     By specifying "experiments" in PipelineOptions ,
> >              >>     ==========================================
> >              >>              options = PipelineOptions([
> >              >>                            "--runner=FlinkRunner",
> >              >>                            "--flink_version=1.8",
> >              >>
> >             "--flink_master_url=localhost:8081",
> >              >>                            "--experiments=beam_fn_api"
> >              >>                        ])
> >              >>     ==========================================
> >              >>
> >              >>     I was able to submit the job successfully.
> >              >>
> >              >>     [grpc-default-executor-0] INFO
> >              >>     org.apache.beam.runners.flink.FlinkJobInvoker -
> >             Invoking job
> >              >>
> >
>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
> >              >>     [grpc-default-executor-0] INFO
> >              >>
> >
>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
> >              >>     Starting job invocation
> >              >>
> >
>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
> >              >>     [flink-runner-job-invoker] INFO
> >              >>     org.apache.beam.runners.flink.FlinkPipelineRunner -
> >             Translating
> >              >>     pipeline to Flink program.
> >              >>     [flink-runner-job-invoker] INFO
> >              >>
> >               org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> >             Creating
> >              >>     a Batch Execution Environment.
> >              >>     [flink-runner-job-invoker] INFO
> >              >>
> >               org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> >             Using
> >              >>     Flink Master URL localhost:8081.
> >              >>     [flink-runner-job-invoker] WARN
> >              >>
> >               org.apache.beam.runners.flink.FlinkExecutionEnvironments -
> No
> >              >>     default parallelism could be found. Defaulting to
> >             parallelism 1.
> >              >>     Please set an explicit parallelism with --parallelism
> >              >>     [flink-runner-job-invoker] INFO
> >              >>     org.apache.flink.api.java.ExecutionEnvironment - The
> >             job has 0
> >              >>     registered types and 0 default Kryo serializers
> >              >>     [flink-runner-job-invoker] INFO
> >              >>     org.apache.flink.configuration.Configuration -
> >             Config uses fallback
> >              >>     configuration key 'jobmanager.rpc.address' instead
> >             of key 'rest.address'
> >              >>     [flink-runner-job-invoker] INFO
> >              >>     org.apache.flink.runtime.rest.RestClient - Rest
> >             client endpoint started.
> >              >>     [flink-runner-job-invoker] INFO
> >              >>
> >               org.apache.flink.client.program.rest.RestClusterClient -
> >             Submitting
> >              >>     job 4e055a8878dda3f564a7b7c84d48510d (detached:
> false).
> >              >>
> >              >>     Thanks,
> >              >>     Yu Watanabe
> >              >>
> >              >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver
> >             <kcwea...@google.com <mailto:kcwea...@google.com>
> >              >>     <mailto:kcwea...@google.com
> >             <mailto:kcwea...@google.com>>> wrote:
> >              >>
> >              >>         Try adding "--experiments=beam_fn_api" to your
> >             pipeline options.
> >              >>         (This is a known issue with Beam 2.15 that will
> >             be fixed in 2.16.)
> >              >>
> >              >>         Kyle Weaver | Software Engineer |
> >             github.com/ibzib <http://github.com/ibzib>
> >              >>         <http://github.com/ibzib> | kcwea...@google.com
> >             <mailto:kcwea...@google.com>
> >              >>         <mailto:kcwea...@google.com
> >             <mailto:kcwea...@google.com>>
> >              >>
> >              >>
> >              >>         On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
> >              >>         <yu.w.ten...@gmail.com
> >             <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
> >             <mailto:yu.w.ten...@gmail.com>>> wrote:
> >              >>
> >              >>             Hello.
> >              >>
> >              >>             I am trying to spin up the flink runner but
> >             looks like data
> >              >>             serialization is failing.
> >              >>             I would like to ask for help to get over
> >             with this error.
> >              >>
> >              >>
> >
>  ========================================================================
> >              >>             [flink-runner-job-invoker] ERROR
> >              >>
> >
>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
> >              >>             - Error during job invocation
> >              >>
> >
>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
> >              >>             java.lang.IllegalArgumentException: unable
> >             to deserialize
> >              >>             BoundedSource
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> >              >>                      at
> >              >>
> >
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
> >              >>             ywatanabe@debian-09-00:~$
> >              >>                      at
> >              >>
> >
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >              >>                      at
> >             java.lang.Thread.run(Thread.java:748)
> >              >>             Caused by: java.io.IOException:
> >             FAILED_TO_UNCOMPRESS(5)
> >              >>                      at
> >              >>
> >
>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
> >              >>                      at
> >              >>
> >               org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> >              >>                      at
> >              >>
> >               org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
> >              >>                      at
> >             org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
> >              >>                      at
> >              >>
> >
>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
> >              >>                      at
> >              >>
> >
>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
> >              >>                      at
> >              >>
> >
>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
> >              >>                      at
> >              >>
> >
>  
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
> >              >>                      ... 13 more
> >              >>
> >
>  ========================================================================
> >              >>
> >              >>             My beam version is below.
> >              >>
> >              >>
> >
>  =======================================================================
> >              >>             (python) ywatanabe@debian-09-00:~$ pip3
> >             freeze | grep
> >              >>             apache-beam
> >              >>             apache-beam==2.15.0
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>             I have my harness container ready on  the
> >             registry.
> >              >>
> >              >>
> >
>  =======================================================================
> >              >>             ywatanabe@debian-09-00:~$ docker search
> >              >> ywatanabe-docker-apache.bintray.io/python3
> >             <http://ywatanabe-docker-apache.bintray.io/python3>
> >              >>
> >               <http://ywatanabe-docker-apache.bintray.io/python3>
> >              >>             NAME                DESCRIPTION         STARS
> >              >>             OFFICIAL            AUTOMATED
> >              >>             beam/python3                            0
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>             Flink is ready on separate cluster.
> >              >>
> >              >>
> >
>  =======================================================================
> >              >>             (python) ywatanabe@debian-09-00:~$ ss -atunp
> >             | grep 8081
> >              >>             tcp    LISTEN     0      128      :::8081
> >                   :::*
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>
> >              >>             My debian version.
> >              >>
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>             (python) ywatanabe@debian-09-00:~$ cat
> >             /etc/debian_version
> >              >>             9.11
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>
> >              >>             My code snippet is below.
> >              >>
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>                  options = PipelineOptions([
> >              >>                                "--runner=FlinkRunner",
> >              >>                                "--flink_version=1.8",
> >              >>             "--flink_master_url=localhost:8081"
> >              >>                            ])
> >              >>
> >              >>                  with beam.Pipeline(options=options) as
> p:
> >              >>
> >              >>                      (p | beam.Create(["Hello World"]))
> >              >>
> >
>  =======================================================================
> >              >>
> >              >>
> >              >>             Would there be any other settings should I
> >             look for ?
> >              >>
> >              >>             Thanks,
> >              >>             Yu Watanabe
> >              >>
> >              >>             --
> >              >>             Yu Watanabe
> >              >>             Weekend Freelancer who loves to challenge
> >             building data
> >              >>             platform
> >              >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
> >             <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com
> >>
> >              >>             LinkedIn icon
> >             <https://www.linkedin.com/in/yuwatanabe1>
> >              >>             Twitter icon <https://twitter.com/yuwtennis>
> >              >>
> >              >>
> >              >>
> >              >>     --
> >              >>     Yu Watanabe
> >              >>     Weekend Freelancer who loves to challenge building
> >             data platform
> >              >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
> >             <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com
> >>
> >              >>     LinkedIn icon
> >             <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
> >              >>     <https://twitter.com/yuwtennis>
> >              >>
> >
>

Reply via email to