Will defer to the release manager; one reason to cherry-pick is that 2.24.0 will be the last release with Python 2 support, so Py2 users of Portable Python Local Runner might appreciate the fix, since they won't be able to use the next release.
On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <[email protected]> wrote: > +Daniel as in charge of 2.24 per dev@ thread. > > On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <[email protected]> > wrote: > >> The PR is merged. >> >> Do folks think this warrants being cherrypicked into v2.24? My hunch is >> yes, cause basically one of the runners (local portable python runner) is >> broken for any production workload (works only if your pipeline has no >> dependencies). >> >> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <[email protected]> >> wrote: >> >>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571 >>> >>> However, I'm not up to date on the portable test infrastructure and >>> would appreciate guidance on what tests I can add for this. >>> >>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> (FYI Sam [email protected] <[email protected]>) >>>> >>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> >>>>> Ok I found the bug, and now I don't understand how it could have >>>>> possibly ever worked. And if this was never tested, then I don't >>>>> understand >>>>> why it works after fixing this one bug :) >>>>> >>>>> Basically the Python ArtifactStaging/RetrievalService uses >>>>> FileSystems.open() to read the artifacts to be staged, and >>>>> FileSystems.open() by default decompresses compressed files based on their >>>>> extension. >>>>> I found two of such services - in Python and in Java. Is the Python >>>>> used with an embedded job endpoint and the java one otherwise? I haven't >>>>> inspected the Java one, but fixing Python does the trick. >>>>> >>>>> The fix is this patch: >>>>> >>>>> diff --git >>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>> index f2bbf534c3..1f3ec1c0b0 100644 >>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>> @@ -41,6 +41,7 @@ import grpc >>>>> from future.moves.urllib.request import urlopen >>>>> >>>>> from apache_beam.io import filesystems >>>>> +from apache_beam.io.filesystems import CompressionTypes >>>>> from apache_beam.portability import common_urns >>>>> from apache_beam.portability.api import beam_artifact_api_pb2 >>>>> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object): >>>>> self._root = root >>>>> >>>>> def file_reader(self, path): >>>>> - return filesystems.FileSystems.open(path) >>>>> + return filesystems.FileSystems.open( >>>>> + path, compression_type=CompressionTypes.UNCOMPRESSED) >>>>> >>>>> def file_writer(self, name=None): >>>>> full_path = filesystems.FileSystems.join(self._root, name) >>>>> diff --git >>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>> index 5bf3282250..2684235be0 100644 >>>>> --- >>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>> +++ >>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>> @@ -45,6 +45,7 @@ from typing import overload >>>>> import grpc >>>>> >>>>> from apache_beam.io import filesystems >>>>> +from apache_beam.io.filesystems import CompressionTypes >>>>> from apache_beam.portability import common_urns >>>>> from apache_beam.portability import python_urns >>>>> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >>>>> @@ -464,9 +465,13 @@ class GrpcServer(object): >>>>> self.provision_info.provision_info, worker_manager), >>>>> self.control_server) >>>>> >>>>> + def open_uncompressed(f): >>>>> + return filesystems.FileSystems.open( >>>>> + f, compression_type=CompressionTypes.UNCOMPRESSED) >>>>> + >>>>> >>>>> >>>>> beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( >>>>> artifact_service.ArtifactRetrievalService( >>>>> - file_reader=filesystems.FileSystems.open), >>>>> + file_reader=open_uncompressed), >>>>> self.control_server) >>>>> >>>>> self.data_plane_handler = data_plane.BeamFnDataServicer( >>>>> >>>>> >>>>> >>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Maximilian, >>>>>> >>>>>> Thank you - it works fine with the embedded Flink runner (per below, >>>>>> seems like it's not using Docker for running Python code? What is it >>>>>> using >>>>>> then?). >>>>>> >>>>>> However, the original bug appears to be wider than I thought - it is >>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. >>>>>> Seems >>>>>> like something is very broken in local Docker execution in general - I >>>>>> haven't yet verified whether the same error will happen when running on a >>>>>> remote Flink cluster. >>>>>> >>>>>> Trying to build my own SDK containers with some more debugging so I >>>>>> can figure out what's going on... >>>>>> >>>>>> >>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Looks like you ran into a bug. >>>>>>> >>>>>>> You could just run your program without specifying any arguments, >>>>>>> since >>>>>>> running with Python's FnApiRunner should be enough. >>>>>>> >>>>>>> Alternatively, how about trying to run the same pipeline with the >>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an >>>>>>> endpoint. >>>>>>> It will run the Python code embedded (loopback environment) without >>>>>>> additional containers. >>>>>>> >>>>>>> Cheers, >>>>>>> Max >>>>>>> >>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote: >>>>>>> > Thanks Valentyn! >>>>>>> > >>>>>>> > Good to know that this is a bug (I'll file a bug), and that >>>>>>> Dataflow has >>>>>>> > an experimental way to use custom containers. I'll try that. >>>>>>> > >>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev >>>>>>> > <[email protected] <mailto:[email protected]>> wrote: >>>>>>> > >>>>>>> > Hi Eugene, >>>>>>> > >>>>>>> > Good to hear from you. The experience you are describing on >>>>>>> Portable >>>>>>> > Runner + Docker container in local execution mode is most >>>>>>> certainly >>>>>>> > a bug, if you have not opened an issue on it, please do so and >>>>>>> feel >>>>>>> > free to cc me. >>>>>>> > >>>>>>> > I can also reproduce the bug and likewise didn't see anything >>>>>>> > obvious immediately, this needs some debugging. >>>>>>> > >>>>>>> > cc: +Ankur Goenka <mailto:[email protected]> +Kyle Weaver >>>>>>> > <mailto:[email protected]> who recently worked on Portable >>>>>>> Runner >>>>>>> > and may be interested. >>>>>>> > >>>>>>> > By the way, you should be able to use custom containers with >>>>>>> > Dataflow, if you set --experiments=use_runner_v2. >>>>>>> > >>>>>>> > On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov >>>>>>> > <[email protected] <mailto:[email protected]>> wrote: >>>>>>> > >>>>>>> > (cc'ing Sam with whom I'm working on this atm) >>>>>>> > >>>>>>> > FWIW I'm still stumped. I've looked through Python, Go and >>>>>>> Java >>>>>>> > code in the Beam repo having anything to do with >>>>>>> > gzipping/unzipping, and none of it appears to be used in >>>>>>> the >>>>>>> > artifact staging/retrieval codepaths. I also can't find any >>>>>>> > mention of compression/decompression in the container boot >>>>>>> code. >>>>>>> > My next step will be to add a bunch of debugging, rebuild >>>>>>> the >>>>>>> > containers, and see what the artifact services think >>>>>>> they're >>>>>>> > serving. >>>>>>> > >>>>>>> > >>>>>>> > On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov >>>>>>> > <[email protected] <mailto:[email protected]>> >>>>>>> wrote: >>>>>>> > >>>>>>> > Thanks Austin! Good stuff - though note that I am >>>>>>> > /not/ using custom containers, I'm just trying to get >>>>>>> the >>>>>>> > basic stuff to work, a Python pipeline with a simple >>>>>>> > requirements.txt file. Feels like this should work >>>>>>> > out-of-the-box, I must be doing something wrong. >>>>>>> > >>>>>>> > On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett >>>>>>> > <[email protected] >>>>>>> > <mailto:[email protected]>> wrote: >>>>>>> > >>>>>>> > I only believe @OrielResearch Eila Arich-Landkof >>>>>>> > <mailto:[email protected]> potentially doing >>>>>>> > applied work with custom containers (there must be >>>>>>> others)! >>>>>>> > >>>>>>> > For a plug for her and @BeamSummit -- I think >>>>>>> enough >>>>>>> > related will be talked about in (with Conda >>>>>>> specifics) >>>>>>> > --> >>>>>>> > >>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/ >>>>>>> > >>>>>>> > I'm sure others will have more things to say that >>>>>>> are >>>>>>> > actually helpful, on-list, before that occurs (~3 >>>>>>> weeks). >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov >>>>>>> > <[email protected] <mailto:[email protected]>> >>>>>>> wrote: >>>>>>> > >>>>>>> > Hi old Beam friends, >>>>>>> > >>>>>>> > I left Google to work on climate change >>>>>>> > < >>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U >>>>>>> > >>>>>>> > and am now doing a short engagement with >>>>>>> Pachama >>>>>>> > <https://pachama.com/>. Right now I'm trying >>>>>>> to get >>>>>>> > a Beam Python pipeline to work; the pipeline >>>>>>> will >>>>>>> > use fancy requirements and native >>>>>>> dependencies, and >>>>>>> > we plan to run it on Cloud Dataflow (so custom >>>>>>> > containers are not yet an option), so I'm going >>>>>>> > straight for the direct PortableRunner as per >>>>>>> > >>>>>>> https://beam.apache.org/documentation/runtime/environments/. >>>>>>> > >>>>>>> > Basically I can't get a minimal Beam program >>>>>>> with a >>>>>>> > minimal requirements.txt file to work - the >>>>>>> .tar.gz >>>>>>> > of the dependency mysteriously ends up being >>>>>>> > ungzipped and non-installable inside the Docker >>>>>>> > container running the worker. Details below. >>>>>>> > >>>>>>> > === main.py === >>>>>>> > import argparse >>>>>>> > import logging >>>>>>> > >>>>>>> > import apache_beam as beam >>>>>>> > from apache_beam.options.pipeline_options >>>>>>> import >>>>>>> > PipelineOptions >>>>>>> > from apache_beam.options.pipeline_options >>>>>>> import >>>>>>> > SetupOptions >>>>>>> > >>>>>>> > def run(argv=None): >>>>>>> > parser = argparse.ArgumentParser() >>>>>>> > known_args, pipeline_args = >>>>>>> > parser.parse_known_args(argv) >>>>>>> > >>>>>>> > pipeline_options = >>>>>>> PipelineOptions(pipeline_args) >>>>>>> > >>>>>>> > >>>>>>> pipeline_options.view_as(SetupOptions).save_main_session >>>>>>> > = True >>>>>>> > >>>>>>> > with >>>>>>> beam.Pipeline(options=pipeline_options) as p: >>>>>>> > (p | 'Create' >> >>>>>>> beam.Create(['Hello']) >>>>>>> > | 'Write' >> >>>>>>> beam.io.WriteToText('/tmp')) >>>>>>> > >>>>>>> > >>>>>>> > if __name__ == '__main__': >>>>>>> > logging.getLogger().setLevel(logging.INFO) >>>>>>> > run() >>>>>>> > >>>>>>> > === requirements.txt === >>>>>>> > alembic >>>>>>> > >>>>>>> > When I run the program: >>>>>>> > $ python3 main.py >>>>>>> > >>>>>>> --runner=PortableRunner --job_endpoint=embed >>>>>>> --requirements_file=requirements.txt >>>>>>> > >>>>>>> > >>>>>>> > I get some normal output and then: >>>>>>> > >>>>>>> > >>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b' >>>>>>> > File >>>>>>> > >>>>>>> >>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>>>>> > line 261, in unpack_file\n >>>>>>> untar_file(filename, >>>>>>> > location)\n File >>>>>>> > >>>>>>> >>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>>>>> > line 177, in untar_file\n tar = >>>>>>> > tarfile.open(filename, mode)\n File >>>>>>> > "/usr/local/lib/python3.7/tarfile.py", line >>>>>>> 1591, in >>>>>>> > open\n return func(name, filemode, fileobj, >>>>>>> > **kwargs)\n File >>>>>>> > "/usr/local/lib/python3.7/tarfile.py", line >>>>>>> 1648, in >>>>>>> > gzopen\n raise ReadError("not a gzip >>>>>>> > file")\ntarfile.ReadError: not a gzip >>>>>>> > file\n2020/08/08 01:17:07 Failed to install >>>>>>> required >>>>>>> > packages: failed to install requirements: exit >>>>>>> > status 2\n' >>>>>>> > >>>>>>> > This greatly puzzled me and, after some >>>>>>> looking, I >>>>>>> > found something really surprising. Here is the >>>>>>> > package in the /directory to be staged/: >>>>>>> > >>>>>>> > $ file >>>>>>> > >>>>>>> >>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>>>>> > ...: gzip compressed data, was >>>>>>> > "dist/alembic-1.4.2.tar", last modified: Thu >>>>>>> Mar 19 >>>>>>> > 21:48:31 2020, max compression, original size >>>>>>> modulo >>>>>>> > 2^32 4730880 >>>>>>> > $ ls -l >>>>>>> > >>>>>>> >>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>>>>> > -rw-r--r-- 1 jkff staff 1092045 Aug 7 >>>>>>> 16:56 ... >>>>>>> > >>>>>>> > So far so good. But here is the same file >>>>>>> inside the >>>>>>> > Docker container (I ssh'd into the dead >>>>>>> container >>>>>>> > < >>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers >>>>>>> >): >>>>>>> > >>>>>>> > # file /tmp/staged/alembic-1.4.2.tar.gz >>>>>>> > /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar >>>>>>> archive >>>>>>> > (GNU) >>>>>>> > # ls -l /tmp/staged/alembic-1.4.2.tar.gz >>>>>>> > -rwxr-xr-x 1 root root 4730880 Aug 8 01:17 >>>>>>> > /tmp/staged/alembic-1.4.2.tar.gz >>>>>>> > >>>>>>> > The file has clearly been unzipped and now of >>>>>>> course >>>>>>> > pip can't install it! What's going on here? Am >>>>>>> I >>>>>>> > using the direct/portable runner combination >>>>>>> wrong? >>>>>>> > >>>>>>> > Thanks! >>>>>>> > >>>>>>> > -- >>>>>>> > Eugene Kirpichov >>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Eugene Kirpichov >>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Eugene Kirpichov >>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Eugene Kirpichov >>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Eugene Kirpichov >>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>> >>>>> >>>>> >>>>> -- >>>>> Eugene Kirpichov >>>>> http://www.linkedin.com/in/eugenekirpichov >>>>> >>>> >>>> >>>> -- >>>> Eugene Kirpichov >>>> http://www.linkedin.com/in/eugenekirpichov >>>> >>> >>> >>> -- >>> Eugene Kirpichov >>> http://www.linkedin.com/in/eugenekirpichov >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> > > > -- > Eugene Kirpichov > http://www.linkedin.com/in/eugenekirpichov >
