Do you see any warnings "Failed to import beam plugin ..." in your logs? If so, that may indicate some kind of packaging issue.
On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]> wrote: > Discovery of file-system implementations should only occur after your > package is installed. But your class won't be discoverable until it is > imported somewhere. > > I agree that we need a better mechanism for registering and discovering new > FileSystems. Would you mind filing a JIRA ticket for that ? > > - Cham > > > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected]> > wrote: >> >> Thanks for the clarification, Cham! >> >> Does it mean that for now I'm pretty much stuck with a modified Beam >> distribution, unless/until that filesystem is a part of Beam, or until >> there's a better mechanism to discover new filesystem classes? >> >> Alternatively, maybe it's still possible to change the order of imports >> somehow, so that we initialize the FileSystems class after all the side >> packages are imported? That would be an even better solution, although it >> may not be possible due to circular dependency issues or whatnot. >> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath <[email protected]> >> wrote: >>> >>> You should be able to use a locally modified version of Beam using >>> following. >>> >>> python setup.py sdist >>> When running your program use option '--sdk_location dist/<src dist>' >>> when running your program. >>> >>> Beam currently look for FileSystem implementations using the sub-class >>> check below. >>> >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L58 >>> >>> For this to work you need to force import your FileSystem similar to >>> following. >>> >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 >>> >>> Thanks, >>> Cham >>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <[email protected]> >>> wrote: >>>> >>>> Hey folks, >>>> >>>> So, I've been trying to run this custom S3 filesystem of mine on the >>>> Dataflow runner. Works fine on the direct runner, even produces the correct >>>> results. :) >>>> >>>> However, when I run it on Dataflow (it's basically a wordcount example >>>> that has the gs:// paths replaced with s3:// paths), I see the following >>>> errors: >>>> >>>> >>>> (b84963e5767747e2): Traceback (most recent call last): File >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", >>>> line 581, in do_work work_executor.execute() File >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line >>>> 209, in execute self._split_task) File >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line >>>> 217, in _perform_source_split_considering_api_limits desired_bundle_size) >>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >>>> line 254, in _perform_source_split for split in >>>> source.split(desired_bundle_size): File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", >>>> line 172, in split return self._get_concat_source().split( File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", >>>> line 109, in _f return fnc(self, *args, **kwargs) File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", >>>> line 119, in _get_concat_source match_result = >>>> FileSystems.match([pattern])[0] File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >>>> line >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0]) File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >>>> line >>>> 61, in get_filesystem raise ValueError('Unable to get the Filesystem for >>>> path %s' % path) ValueError: Unable to get the Filesystem for path >>>> s3://pm-dataflow/test >>>> >>>> >>>> To my understanding, this means that the module that contains my >>>> S3FileSystem doesn't get imported (it currently resides in a separate >>>> package that I ship to the pipeline). And, I guess, the order of modules >>>> importing may differ depending on the runner, that's the only explanation >>>> of >>>> why exactly the same pipeline works on the direct runner and produces the >>>> correct results. >>>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make Beam >>>> upload some locally-modified version of the apache_beam package to >>>> Dataflow? >>>> I'd be happy to provide more details if needed. Or should I instead be >>>> reaching out to the Dataflow team? >>>> >>>> Thank you. >>>> >>>> >>>> -- >>>> Best regards, >>>> Dmitry Demeshchuk. >> >> >> >> >> -- >> Best regards, >> Dmitry Demeshchuk.
