You should be able to use a locally modified version of Beam using following.
python setup.py sdist When running your program use option '--sdk_location dist/<src dist>' when running your program. Beam currently look for FileSystem implementations using the sub-class check below. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L58 For this to work you need to force import your FileSystem similar to following. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 Thanks, Cham On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <[email protected]> wrote: > Hey folks, > > So, I've been trying to run this custom S3 filesystem of mine on the > Dataflow runner. Works fine on the direct runner, even produces the correct > results. :) > > However, when I run it on Dataflow (it's basically a wordcount example > that has the gs:// paths replaced with s3:// paths), I see the following > errors: > > > (b84963e5767747e2): Traceback (most recent call last): File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", > line 581, in do_work work_executor.execute() File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line > 209, in execute self._split_task) File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line > 217, in _perform_source_split_considering_api_limits desired_bundle_size) > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 254, in _perform_source_split for split in > source.split(desired_bundle_size): File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", > line 172, in split return self._get_concat_source().split( File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f return fnc(self, *args, **kwargs) File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", > line 119, in _get_concat_source match_result = > FileSystems.match([pattern])[0] File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line 130, in match filesystem = FileSystems.get_filesystem(patterns[0]) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line 61, in get_filesystem raise ValueError('Unable to get the Filesystem > for path %s' % path) ValueError: Unable to get the Filesystem for path > s3://pm-dataflow/test > > > To my understanding, this means that the module that contains my > S3FileSystem doesn't get imported (it currently resides in a separate > package that I ship to the pipeline). And, I guess, the order of modules > importing may differ depending on the runner, that's the only explanation > of why exactly the same pipeline works on the direct runner and produces > the correct results. > > Any ideas of the best way to troubleshoot this? Can I somehow make Beam > upload some locally-modified version of the apache_beam package to > Dataflow? I'd be happy to provide more details if needed. Or should I > instead be reaching out to the Dataflow team? > > Thank you. > > > -- > Best regards, > Dmitry Demeshchuk. >
