Will defer to the release manager; one reason to cherry-pick is that 2.24.0
will be the last release with Python 2 support, so Py2 users of Portable
Python Local Runner might appreciate the fix, since they won't be able to
use the next release.

On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <[email protected]>
wrote:

> +Daniel as in charge of 2.24 per dev@ thread.
>
> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> The PR is merged.
>>
>> Do folks think this warrants being cherrypicked into v2.24? My hunch is
>> yes, cause basically one of the runners (local portable python runner) is
>> broken for any production workload (works only if your pipeline has no
>> dependencies).
>>
>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>>>
>>> However, I'm not up to date on the portable test infrastructure and
>>> would appreciate guidance on what tests I can add for this.
>>>
>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> (FYI Sam [email protected] <[email protected]>)
>>>>
>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <[email protected]>
>>>> wrote:
>>>>
>>>>> Ok I found the bug, and now I don't understand how it could have
>>>>> possibly ever worked. And if this was never tested, then I don't 
>>>>> understand
>>>>> why it works after fixing this one bug :)
>>>>>
>>>>> Basically the Python ArtifactStaging/RetrievalService uses
>>>>> FileSystems.open() to read the artifacts to be staged, and
>>>>> FileSystems.open() by default decompresses compressed files based on their
>>>>> extension.
>>>>> I found two of such services - in Python and in Java. Is the Python
>>>>> used with an embedded job endpoint and the java one otherwise? I haven't
>>>>> inspected the Java one, but fixing Python does the trick.
>>>>>
>>>>> The fix is this patch:
>>>>>
>>>>> diff --git
>>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> index f2bbf534c3..1f3ec1c0b0 100644
>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>>> @@ -41,6 +41,7 @@ import grpc
>>>>>  from future.moves.urllib.request import urlopen
>>>>>
>>>>>  from apache_beam.io import filesystems
>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>  from apache_beam.portability import common_urns
>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>>>      self._root = root
>>>>>
>>>>>    def file_reader(self, path):
>>>>> -    return filesystems.FileSystems.open(path)
>>>>> +    return filesystems.FileSystems.open(
>>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>>
>>>>>    def file_writer(self, name=None):
>>>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>>>> diff --git
>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> index 5bf3282250..2684235be0 100644
>>>>> ---
>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> +++
>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>>> @@ -45,6 +45,7 @@ from typing import overload
>>>>>  import grpc
>>>>>
>>>>>  from apache_beam.io import filesystems
>>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>>  from apache_beam.portability import common_urns
>>>>>  from apache_beam.portability import python_urns
>>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>>>                  self.provision_info.provision_info, worker_manager),
>>>>>              self.control_server)
>>>>>
>>>>> +      def open_uncompressed(f):
>>>>> +        return filesystems.FileSystems.open(
>>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>> +
>>>>>
>>>>>  
>>>>> beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>>>            artifact_service.ArtifactRetrievalService(
>>>>> -              file_reader=filesystems.FileSystems.open),
>>>>> +              file_reader=open_uncompressed),
>>>>>            self.control_server)
>>>>>
>>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Maximilian,
>>>>>>
>>>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>>>> seems like it's not using Docker for running Python code? What is it 
>>>>>> using
>>>>>> then?).
>>>>>>
>>>>>> However, the original bug appears to be wider than I thought - it is
>>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. 
>>>>>> Seems
>>>>>> like something is very broken in local Docker execution in general - I
>>>>>> haven't yet verified whether the same error will happen when running on a
>>>>>> remote Flink cluster.
>>>>>>
>>>>>> Trying to build my own SDK containers with some more debugging so I
>>>>>> can figure out what's going on...
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Looks like you ran into a bug.
>>>>>>>
>>>>>>> You could just run your program without specifying any arguments,
>>>>>>> since
>>>>>>> running with Python's FnApiRunner should be enough.
>>>>>>>
>>>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an
>>>>>>> endpoint.
>>>>>>> It will run the Python code embedded (loopback environment) without
>>>>>>> additional containers.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>>>> > Thanks Valentyn!
>>>>>>> >
>>>>>>> > Good to know that this is a bug (I'll file a bug), and that
>>>>>>> Dataflow has
>>>>>>> > an experimental way to use custom containers. I'll try that.
>>>>>>> >
>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>>>> > <[email protected] <mailto:[email protected]>> wrote:
>>>>>>> >
>>>>>>> >     Hi Eugene,
>>>>>>> >
>>>>>>> >     Good to hear from you. The experience you are describing on
>>>>>>> Portable
>>>>>>> >     Runner + Docker container in local execution mode is most
>>>>>>> certainly
>>>>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>>>>> feel
>>>>>>> >     free to cc me.
>>>>>>> >
>>>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>>>> >     obvious immediately, this needs some debugging.
>>>>>>> >
>>>>>>> >     cc: +Ankur Goenka <mailto:[email protected]> +Kyle Weaver
>>>>>>> >     <mailto:[email protected]> who recently worked on Portable
>>>>>>> Runner
>>>>>>> >     and may be interested.
>>>>>>> >
>>>>>>> >     By the way, you should be able to use custom containers with
>>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>>>> >
>>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>>>> >     <[email protected] <mailto:[email protected]>> wrote:
>>>>>>> >
>>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>>>> >
>>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>>>>> Java
>>>>>>> >         code in the Beam repo having anything to do with
>>>>>>> >         gzipping/unzipping, and none of it appears to be used in
>>>>>>> the
>>>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>>>> >         mention of compression/decompression in the container boot
>>>>>>> code.
>>>>>>> >         My next step will be to add a bunch of debugging, rebuild
>>>>>>> the
>>>>>>> >         containers, and see what the artifact services think
>>>>>>> they're
>>>>>>> >         serving.
>>>>>>> >
>>>>>>> >
>>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>>>> >         <[email protected] <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>>>> >             /not/ using custom containers, I'm just trying to get
>>>>>>> the
>>>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>>>> >             requirements.txt file. Feels like this should work
>>>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>>>> >
>>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>>>> >             <[email protected]
>>>>>>> >             <mailto:[email protected]>> wrote:
>>>>>>> >
>>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>>>> >                 <mailto:[email protected]> potentially doing
>>>>>>> >                 applied work with custom containers (there must be
>>>>>>> others)!
>>>>>>> >
>>>>>>> >                 For a plug for her and @BeamSummit --  I think
>>>>>>> enough
>>>>>>> >                 related will be talked about in (with Conda
>>>>>>> specifics)
>>>>>>> >                 -->
>>>>>>> >
>>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>>>> >
>>>>>>> >                 I'm sure others will have more things to say that
>>>>>>> are
>>>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>>>> weeks).
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>>>> >                 <[email protected] <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> >                     Hi old Beam friends,
>>>>>>> >
>>>>>>> >                     I left Google to work on climate change
>>>>>>> >                     <
>>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>>>> >
>>>>>>> >                     and am now doing a short engagement with
>>>>>>> Pachama
>>>>>>> >                     <https://pachama.com/>. Right now I'm trying
>>>>>>> to get
>>>>>>> >                     a Beam Python pipeline to work; the pipeline
>>>>>>> will
>>>>>>> >                     use fancy requirements and native
>>>>>>> dependencies, and
>>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>>>> >                     containers are not yet an option), so I'm going
>>>>>>> >                     straight for the direct PortableRunner as per
>>>>>>> >
>>>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>>>> >
>>>>>>> >                     Basically I can't get a minimal Beam program
>>>>>>> with a
>>>>>>> >                     minimal requirements.txt file to work - the
>>>>>>> .tar.gz
>>>>>>> >                     of the dependency mysteriously ends up being
>>>>>>> >                     ungzipped and non-installable inside the Docker
>>>>>>> >                     container running the worker. Details below.
>>>>>>> >
>>>>>>> >                     === main.py ===
>>>>>>> >                     import argparse
>>>>>>> >                     import logging
>>>>>>> >
>>>>>>> >                     import apache_beam as beam
>>>>>>> >                     from apache_beam.options.pipeline_options
>>>>>>> import
>>>>>>> >                     PipelineOptions
>>>>>>> >                     from apache_beam.options.pipeline_options
>>>>>>> import
>>>>>>> >                     SetupOptions
>>>>>>> >
>>>>>>> >                     def run(argv=None):
>>>>>>> >                          parser = argparse.ArgumentParser()
>>>>>>> >                          known_args, pipeline_args =
>>>>>>> >                     parser.parse_known_args(argv)
>>>>>>> >
>>>>>>> >                          pipeline_options =
>>>>>>> PipelineOptions(pipeline_args)
>>>>>>> >
>>>>>>> >
>>>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>>>> >                     = True
>>>>>>> >
>>>>>>> >                          with
>>>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>>>> >                              (p | 'Create' >>
>>>>>>> beam.Create(['Hello'])
>>>>>>> >                                 | 'Write' >>
>>>>>>> beam.io.WriteToText('/tmp'))
>>>>>>> >
>>>>>>> >
>>>>>>> >                     if __name__ == '__main__':
>>>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>>>> >                          run()
>>>>>>> >
>>>>>>> >                     === requirements.txt ===
>>>>>>> >                     alembic
>>>>>>> >
>>>>>>> >                     When I run the program:
>>>>>>> >                     $ python3 main.py
>>>>>>> >
>>>>>>>  --runner=PortableRunner --job_endpoint=embed 
>>>>>>> --requirements_file=requirements.txt
>>>>>>> >
>>>>>>> >
>>>>>>> >                     I get some normal output and then:
>>>>>>> >
>>>>>>> >
>>>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>>> >                       File
>>>>>>> >
>>>>>>>  
>>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>> >                     line 261, in unpack_file\n
>>>>>>>  untar_file(filename,
>>>>>>> >                     location)\n  File
>>>>>>> >
>>>>>>>  
>>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>>> >                     line 177, in untar_file\n    tar =
>>>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>>> 1591, in
>>>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>>>> >                     **kwargs)\n  File
>>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>>> 1648, in
>>>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>>>> required
>>>>>>> >                     packages: failed to install requirements: exit
>>>>>>> >                     status 2\n'
>>>>>>> >
>>>>>>> >                     This greatly puzzled me and, after some
>>>>>>> looking, I
>>>>>>> >                     found something really surprising. Here is the
>>>>>>> >                     package in the /directory to be staged/:
>>>>>>> >
>>>>>>> >                     $ file
>>>>>>> >
>>>>>>>  
>>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>> >                     ...: gzip compressed data, was
>>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu
>>>>>>> Mar 19
>>>>>>> >                     21:48:31 2020, max compression, original size
>>>>>>> modulo
>>>>>>> >                     2^32 4730880
>>>>>>> >                     $ ls -l
>>>>>>> >
>>>>>>>  
>>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7
>>>>>>> 16:56 ...
>>>>>>> >
>>>>>>> >                     So far so good. But here is the same file
>>>>>>> inside the
>>>>>>> >                     Docker container (I ssh'd into the dead
>>>>>>> container
>>>>>>> >                     <
>>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>>>> >):
>>>>>>> >
>>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>>>> archive
>>>>>>> >                     (GNU)
>>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>>>> >
>>>>>>> >                     The file has clearly been unzipped and now of
>>>>>>> course
>>>>>>> >                     pip can't install it! What's going on here? Am
>>>>>>> I
>>>>>>> >                     using the direct/portable runner combination
>>>>>>> wrong?
>>>>>>> >
>>>>>>> >                     Thanks!
>>>>>>> >
>>>>>>> >                     --
>>>>>>> >                     Eugene Kirpichov
>>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >             --
>>>>>>> >             Eugene Kirpichov
>>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >         --
>>>>>>> >         Eugene Kirpichov
>>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > Eugene Kirpichov
>>>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Reply via email to