On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels <m...@apache.org> wrote:
> > Is not this flag set automatically for the portable runner > > Yes, the flag is set automatically, but it has been broken before and > likely will be again. It just adds additional complexity to portable > Runners. There is no other portability API then the Fn API. This flag > historically had its justification, but seems obsolete now. > I disagree that this flag is obsolete. It is still serving a purpose for batch users using dataflow runner and that is decent chunk of beam python users. I agree with switching the default. I would like to give enough time to decouple the flag from the core code. (With a quick search I saw two instances related to Read and Create.) Have time to test changes and then switch the default. > > An isinstance check might be smarter, but does not get rid of the root > of the problem. > I might be wrong, IIUC, it will temporarily resolve the reported issues. Is this not accurate? > > -Max > > On 17.09.19 14:20, Ahmet Altay wrote: > > Could you make that change and see if it would have addressed the issue > > here? > > > > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com > > <mailto:kcwea...@google.com>> wrote: > > > > The flag is automatically set, but not in a smart way. Taking > > another look at the code, a more resilient fix would be to just > > check if the runner isinstance of PortableRunner. > > > > Kyle Weaver | Software Engineer | github.com/ibzib > > <http://github.com/ibzib> | kcwea...@google.com > > <mailto:kcwea...@google.com> > > > > > > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com > > <mailto:al...@google.com>> wrote: > > > > Is not this flag set automatically for the portable runner here > > [1] ? > > > > [1] > > > https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160 > > > > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw > > <rober...@google.com <mailto:rober...@google.com>> wrote: > > > > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <t...@apache.org > > <mailto:t...@apache.org>> wrote: > > > > > > +1 for making --experiments=beam_fn_api default. > > > > > > Can the Dataflow runner driver just remove the setting if > > it is not compatible? > > > > The tricky bit would be undoing the differences in graph > > construction > > due to this flag flip. But I would be in favor of changing > > the default > > (probably just removing the flag) and moving the > > non-portability parts > > into the dataflow runner itself. (It looks like the key > > differences > > here are for the Create and Read transforms.) > > > > > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org>> wrote: > > >> > > >> +dev > > >> > > >> The beam_fn_api flag and the way it is automatically set > > is error-prone. > > >> Is there anything that prevents us from removing it? I > > understand that > > >> some Runners, e.g. Dataflow Runner have two modes of > > executing Python > > >> pipelines (legacy and portable), but at this point it > > seems clear that > > >> the portability mode should be the default. > > >> > > >> Cheers, > > >> Max > > >> > > >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe > > >> <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> > > wrote: > > >> > > >> Kyle > > >> > > >> Thank you for the assistance. > > >> > > >> By specifying "experiments" in PipelineOptions , > > >> ========================================== > > >> options = PipelineOptions([ > > >> "--runner=FlinkRunner", > > >> "--flink_version=1.8", > > >> > > "--flink_master_url=localhost:8081", > > >> "--experiments=beam_fn_api" > > >> ]) > > >> ========================================== > > >> > > >> I was able to submit the job successfully. > > >> > > >> [grpc-default-executor-0] INFO > > >> org.apache.beam.runners.flink.FlinkJobInvoker - > > Invoking job > > >> > > > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > > >> [grpc-default-executor-0] INFO > > >> > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - > > >> Starting job invocation > > >> > > > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > > >> [flink-runner-job-invoker] INFO > > >> org.apache.beam.runners.flink.FlinkPipelineRunner - > > Translating > > >> pipeline to Flink program. > > >> [flink-runner-job-invoker] INFO > > >> > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > > Creating > > >> a Batch Execution Environment. > > >> [flink-runner-job-invoker] INFO > > >> > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > > Using > > >> Flink Master URL localhost:8081. > > >> [flink-runner-job-invoker] WARN > > >> > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > No > > >> default parallelism could be found. Defaulting to > > parallelism 1. > > >> Please set an explicit parallelism with --parallelism > > >> [flink-runner-job-invoker] INFO > > >> org.apache.flink.api.java.ExecutionEnvironment - The > > job has 0 > > >> registered types and 0 default Kryo serializers > > >> [flink-runner-job-invoker] INFO > > >> org.apache.flink.configuration.Configuration - > > Config uses fallback > > >> configuration key 'jobmanager.rpc.address' instead > > of key 'rest.address' > > >> [flink-runner-job-invoker] INFO > > >> org.apache.flink.runtime.rest.RestClient - Rest > > client endpoint started. > > >> [flink-runner-job-invoker] INFO > > >> > > org.apache.flink.client.program.rest.RestClusterClient - > > Submitting > > >> job 4e055a8878dda3f564a7b7c84d48510d (detached: > false). > > >> > > >> Thanks, > > >> Yu Watanabe > > >> > > >> On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver > > <kcwea...@google.com <mailto:kcwea...@google.com> > > >> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>>> wrote: > > >> > > >> Try adding "--experiments=beam_fn_api" to your > > pipeline options. > > >> (This is a known issue with Beam 2.15 that will > > be fixed in 2.16.) > > >> > > >> Kyle Weaver | Software Engineer | > > github.com/ibzib <http://github.com/ibzib> > > >> <http://github.com/ibzib> | kcwea...@google.com > > <mailto:kcwea...@google.com> > > >> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>> > > >> > > >> > > >> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe > > >> <yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> wrote: > > >> > > >> Hello. > > >> > > >> I am trying to spin up the flink runner but > > looks like data > > >> serialization is failing. > > >> I would like to ask for help to get over > > with this error. > > >> > > >> > > > ======================================================================== > > >> [flink-runner-job-invoker] ERROR > > >> > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation > > >> - Error during job invocation > > >> > > > BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. > > >> java.lang.IllegalArgumentException: unable > > to deserialize > > >> BoundedSource > > >> at > > >> > > > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > > >> at > > >> > > > > org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) > > >> at > > >> > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) > > >> at > > >> > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) > > >> at > > >> > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) > > >> at > > >> > > > > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) > > >> at > > >> > > > > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) > > >> at > > >> > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) > > >> at > > >> > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > > >> at > > >> > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > > >> at > > >> > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > > >> at > > >> > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) > > >> ywatanabe@debian-09-00:~$ > > >> at > > >> > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > >> at > > java.lang.Thread.run(Thread.java:748) > > >> Caused by: java.io.IOException: > > FAILED_TO_UNCOMPRESS(5) > > >> at > > >> > > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > > >> at > > >> > > org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > > >> at > > >> > > org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > > >> at > > org.xerial.snappy.Snappy.uncompress(Snappy.java:513) > > >> at > > >> > > > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) > > >> at > > >> > > > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) > > >> at > > >> > > > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > > >> at > > >> > > > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) > > >> ... 13 more > > >> > > > ======================================================================== > > >> > > >> My beam version is below. > > >> > > >> > > > ======================================================================= > > >> (python) ywatanabe@debian-09-00:~$ pip3 > > freeze | grep > > >> apache-beam > > >> apache-beam==2.15.0 > > >> > > > ======================================================================= > > >> > > >> I have my harness container ready on the > > registry. > > >> > > >> > > > ======================================================================= > > >> ywatanabe@debian-09-00:~$ docker search > > >> ywatanabe-docker-apache.bintray.io/python3 > > <http://ywatanabe-docker-apache.bintray.io/python3> > > >> > > <http://ywatanabe-docker-apache.bintray.io/python3> > > >> NAME DESCRIPTION STARS > > >> OFFICIAL AUTOMATED > > >> beam/python3 0 > > >> > > > ======================================================================= > > >> > > >> Flink is ready on separate cluster. > > >> > > >> > > > ======================================================================= > > >> (python) ywatanabe@debian-09-00:~$ ss -atunp > > | grep 8081 > > >> tcp LISTEN 0 128 :::8081 > > :::* > > >> > > > ======================================================================= > > >> > > >> > > >> My debian version. > > >> > > >> > > > ======================================================================= > > >> > > >> (python) ywatanabe@debian-09-00:~$ cat > > /etc/debian_version > > >> 9.11 > > >> > > > ======================================================================= > > >> > > >> > > >> My code snippet is below. > > >> > > >> > > > ======================================================================= > > >> > > >> options = PipelineOptions([ > > >> "--runner=FlinkRunner", > > >> "--flink_version=1.8", > > >> "--flink_master_url=localhost:8081" > > >> ]) > > >> > > >> with beam.Pipeline(options=options) as > p: > > >> > > >> (p | beam.Create(["Hello World"])) > > >> > > > ======================================================================= > > >> > > >> > > >> Would there be any other settings should I > > look for ? > > >> > > >> Thanks, > > >> Yu Watanabe > > >> > > >> -- > > >> Yu Watanabe > > >> Weekend Freelancer who loves to challenge > > building data > > >> platform > > >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com> > > <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com > >> > > >> LinkedIn icon > > <https://www.linkedin.com/in/yuwatanabe1> > > >> Twitter icon <https://twitter.com/yuwtennis> > > >> > > >> > > >> > > >> -- > > >> Yu Watanabe > > >> Weekend Freelancer who loves to challenge building > > data platform > > >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com> > > <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com > >> > > >> LinkedIn icon > > <https://www.linkedin.com/in/yuwatanabe1> Twitter icon > > >> <https://twitter.com/yuwtennis> > > >> > > >