Ok I found the bug, and now I don't understand how it could have possibly
ever worked. And if this was never tested, then I don't understand why it
works after fixing this one bug :)
Basically the Python ArtifactStaging/RetrievalService uses
FileSystems.open() to read the artifacts to be staged, and
FileSystems.open() by default decompresses compressed files based on their
extension.
I found two of such services - in Python and in Java. Is the Python used
with an embedded job endpoint and the java one otherwise? I haven't
inspected the Java one, but fixing Python does the trick.
The fix is this patch:
diff --git
a/sdks/python/apache_beam/runners/portability/artifact_service.py
b/sdks/python/apache_beam/runners/portability/artifact_service.py
index f2bbf534c3..1f3ec1c0b0 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -41,6 +41,7 @@ import grpc
from future.moves.urllib.request import urlopen
from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
self._root = root
def file_reader(self, path):
- return filesystems.FileSystems.open(path)
+ return filesystems.FileSystems.open(
+ path, compression_type=CompressionTypes.UNCOMPRESSED)
def file_writer(self, name=None):
full_path = filesystems.FileSystems.join(self._root, name)
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 5bf3282250..2684235be0 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -45,6 +45,7 @@ from typing import overload
import grpc
from apache_beam.io import filesystems
+from apache_beam.io.filesystems import CompressionTypes
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
@@ -464,9 +465,13 @@ class GrpcServer(object):
self.provision_info.provision_info, worker_manager),
self.control_server)
+ def open_uncompressed(f):
+ return filesystems.FileSystems.open(
+ f, compression_type=CompressionTypes.UNCOMPRESSED)
+
beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
artifact_service.ArtifactRetrievalService(
- file_reader=filesystems.FileSystems.open),
+ file_reader=open_uncompressed),
self.control_server)
self.data_plane_handler = data_plane.BeamFnDataServicer(
On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <[email protected]>
wrote:
> Hi Maximilian,
>
> Thank you - it works fine with the embedded Flink runner (per below, seems
> like it's not using Docker for running Python code? What is it using then?).
>
> However, the original bug appears to be wider than I thought - it is also
> present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like
> something is very broken in local Docker execution in general - I haven't
> yet verified whether the same error will happen when running on a remote
> Flink cluster.
>
> Trying to build my own SDK containers with some more debugging so I can
> figure out what's going on...
>
>
> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <[email protected]> wrote:
>
>> Looks like you ran into a bug.
>>
>> You could just run your program without specifying any arguments, since
>> running with Python's FnApiRunner should be enough.
>>
>> Alternatively, how about trying to run the same pipeline with the
>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>> It will run the Python code embedded (loopback environment) without
>> additional containers.
>>
>> Cheers,
>> Max
>>
>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>> > Thanks Valentyn!
>> >
>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>> has
>> > an experimental way to use custom containers. I'll try that.
>> >
>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>> > <[email protected] <mailto:[email protected]>> wrote:
>> >
>> > Hi Eugene,
>> >
>> > Good to hear from you. The experience you are describing on Portable
>> > Runner + Docker container in local execution mode is most certainly
>> > a bug, if you have not opened an issue on it, please do so and feel
>> > free to cc me.
>> >
>> > I can also reproduce the bug and likewise didn't see anything
>> > obvious immediately, this needs some debugging.
>> >
>> > cc: +Ankur Goenka <mailto:[email protected]> +Kyle Weaver
>> > <mailto:[email protected]> who recently worked on Portable Runner
>> > and may be interested.
>> >
>> > By the way, you should be able to use custom containers with
>> > Dataflow, if you set --experiments=use_runner_v2.
>> >
>> > On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>> > <[email protected] <mailto:[email protected]>> wrote:
>> >
>> > (cc'ing Sam with whom I'm working on this atm)
>> >
>> > FWIW I'm still stumped. I've looked through Python, Go and Java
>> > code in the Beam repo having anything to do with
>> > gzipping/unzipping, and none of it appears to be used in the
>> > artifact staging/retrieval codepaths. I also can't find any
>> > mention of compression/decompression in the container boot code.
>> > My next step will be to add a bunch of debugging, rebuild the
>> > containers, and see what the artifact services think they're
>> > serving.
>> >
>> >
>> > On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>> > <[email protected] <mailto:[email protected]>> wrote:
>> >
>> > Thanks Austin! Good stuff - though note that I am
>> > /not/ using custom containers, I'm just trying to get the
>> > basic stuff to work, a Python pipeline with a simple
>> > requirements.txt file. Feels like this should work
>> > out-of-the-box, I must be doing something wrong.
>> >
>> > On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>> > <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> > I only believe @OrielResearch Eila Arich-Landkof
>> > <mailto:[email protected]> potentially doing
>> > applied work with custom containers (there must be
>> others)!
>> >
>> > For a plug for her and @BeamSummit -- I think enough
>> > related will be talked about in (with Conda specifics)
>> > -->
>> >
>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>> >
>> > I'm sure others will have more things to say that are
>> > actually helpful, on-list, before that occurs (~3
>> weeks).
>> >
>> >
>> >
>> > On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>> > <[email protected] <mailto:[email protected]>>
>> wrote:
>> >
>> > Hi old Beam friends,
>> >
>> > I left Google to work on climate change
>> > <
>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>> >
>> > and am now doing a short engagement with Pachama
>> > <https://pachama.com/>. Right now I'm trying to get
>> > a Beam Python pipeline to work; the pipeline will
>> > use fancy requirements and native dependencies, and
>> > we plan to run it on Cloud Dataflow (so custom
>> > containers are not yet an option), so I'm going
>> > straight for the direct PortableRunner as per
>> >
>> https://beam.apache.org/documentation/runtime/environments/.
>> >
>> > Basically I can't get a minimal Beam program with a
>> > minimal requirements.txt file to work - the .tar.gz
>> > of the dependency mysteriously ends up being
>> > ungzipped and non-installable inside the Docker
>> > container running the worker. Details below.
>> >
>> > === main.py ===
>> > import argparse
>> > import logging
>> >
>> > import apache_beam as beam
>> > from apache_beam.options.pipeline_options import
>> > PipelineOptions
>> > from apache_beam.options.pipeline_options import
>> > SetupOptions
>> >
>> > def run(argv=None):
>> > parser = argparse.ArgumentParser()
>> > known_args, pipeline_args =
>> > parser.parse_known_args(argv)
>> >
>> > pipeline_options =
>> PipelineOptions(pipeline_args)
>> >
>> >
>> pipeline_options.view_as(SetupOptions).save_main_session
>> > = True
>> >
>> > with beam.Pipeline(options=pipeline_options)
>> as p:
>> > (p | 'Create' >> beam.Create(['Hello'])
>> > | 'Write' >>
>> beam.io.WriteToText('/tmp'))
>> >
>> >
>> > if __name__ == '__main__':
>> > logging.getLogger().setLevel(logging.INFO)
>> > run()
>> >
>> > === requirements.txt ===
>> > alembic
>> >
>> > When I run the program:
>> > $ python3 main.py
>> >
>> --runner=PortableRunner --job_endpoint=embed
>> --requirements_file=requirements.txt
>> >
>> >
>> > I get some normal output and then:
>> >
>> >
>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>> > File
>> >
>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>> > line 261, in unpack_file\n untar_file(filename,
>> > location)\n File
>> >
>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>> > line 177, in untar_file\n tar =
>> > tarfile.open(filename, mode)\n File
>> > "/usr/local/lib/python3.7/tarfile.py", line 1591, in
>> > open\n return func(name, filemode, fileobj,
>> > **kwargs)\n File
>> > "/usr/local/lib/python3.7/tarfile.py", line 1648, in
>> > gzopen\n raise ReadError("not a gzip
>> > file")\ntarfile.ReadError: not a gzip
>> > file\n2020/08/08 01:17:07 Failed to install required
>> > packages: failed to install requirements: exit
>> > status 2\n'
>> >
>> > This greatly puzzled me and, after some looking, I
>> > found something really surprising. Here is the
>> > package in the /directory to be staged/:
>> >
>> > $ file
>> >
>>
>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>> > ...: gzip compressed data, was
>> > "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
>> > 21:48:31 2020, max compression, original size modulo
>> > 2^32 4730880
>> > $ ls -l
>> >
>>
>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>> > -rw-r--r-- 1 jkff staff 1092045 Aug 7 16:56 ...
>> >
>> > So far so good. But here is the same file inside the
>> > Docker container (I ssh'd into the dead container
>> > <
>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>> >):
>> >
>> > # file /tmp/staged/alembic-1.4.2.tar.gz
>> > /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
>> > (GNU)
>> > # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>> > -rwxr-xr-x 1 root root 4730880 Aug 8 01:17
>> > /tmp/staged/alembic-1.4.2.tar.gz
>> >
>> > The file has clearly been unzipped and now of course
>> > pip can't install it! What's going on here? Am I
>> > using the direct/portable runner combination wrong?
>> >
>> > Thanks!
>> >
>> > --
>> > Eugene Kirpichov
>> > http://www.linkedin.com/in/eugenekirpichov
>> >
>> >
>> >
>> > --
>> > Eugene Kirpichov
>> > http://www.linkedin.com/in/eugenekirpichov
>> >
>> >
>> >
>> > --
>> > Eugene Kirpichov
>> > http://www.linkedin.com/in/eugenekirpichov
>> >
>> >
>> >
>> > --
>> > Eugene Kirpichov
>> > http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>
--
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov