You should be able to use a locally modified version of Beam using
following.

python setup.py sdist
When running your program use option '--sdk_location dist/<src dist>' when
running your program.

Beam currently look for FileSystem implementations using the sub-class
check below.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L58

For this to work you need to force import your FileSystem similar to
following.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30

Thanks,
Cham

On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <[email protected]>
wrote:

> Hey folks,
>
> So, I've been trying to run this custom S3 filesystem of mine on the
> Dataflow runner. Works fine on the direct runner, even produces the correct
> results. :)
>
> However, when I run it on Dataflow (it's basically a wordcount example
> that has the gs:// paths replaced with s3:// paths), I see the following
> errors:
>
>
> (b84963e5767747e2): Traceback (most recent call last): File
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
> line 581, in do_work work_executor.execute() File
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
> 209, in execute self._split_task) File
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
> 217, in _perform_source_split_considering_api_limits desired_bundle_size)
> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
> line 254, in _perform_source_split for split in
> source.split(desired_bundle_size): File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
> line 172, in split return self._get_concat_source().split( File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
> line 109, in _f return fnc(self, *args, **kwargs) File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
> line 119, in _get_concat_source match_result =
> FileSystems.match([pattern])[0] File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
> line 130, in match filesystem = FileSystems.get_filesystem(patterns[0])
> File
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
> line 61, in get_filesystem raise ValueError('Unable to get the Filesystem
> for path %s' % path) ValueError: Unable to get the Filesystem for path
> s3://pm-dataflow/test
>
>
> To my understanding, this means that the module that contains my
> S3FileSystem doesn't get imported (it currently resides in a separate
> package that I ship to the pipeline). And, I guess, the order of modules
> importing may differ depending on the runner, that's the only explanation
> of why exactly the same pipeline works on the direct runner and produces
> the correct results.
>
> Any ideas of the best way to troubleshoot this? Can I somehow make Beam
> upload some locally-modified version of the apache_beam package to
> Dataflow? I'd be happy to provide more details if needed. Or should I
> instead be reaching out to the Dataflow team?
>
> Thank you.
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Reply via email to