Do you see any warnings "Failed to import beam plugin ..." in your
logs? If so, that may indicate some kind of packaging issue.

On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]> wrote:
> Discovery of file-system implementations should only occur after your
> package is installed. But your class won't be discoverable until it is
> imported somewhere.
>
> I agree that we need a better mechanism for registering and discovering new
> FileSystems. Would you mind filing a JIRA ticket for that ?
>
> - Cham
>
>
> On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected]>
> wrote:
>>
>> Thanks for the clarification, Cham!
>>
>> Does it mean that for now I'm pretty much stuck with a modified Beam
>> distribution, unless/until that filesystem is a part of Beam, or until
>> there's a better mechanism to discover new filesystem classes?
>>
>> Alternatively, maybe it's still possible to change the order of imports
>> somehow, so that we initialize the FileSystems class after all the side
>> packages are imported? That would be an even better solution, although it
>> may not be possible due to circular dependency issues or whatnot.
>>
>> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath <[email protected]>
>> wrote:
>>>
>>> You should be able to use a locally modified version of Beam using
>>> following.
>>>
>>> python setup.py sdist
>>> When running your program use option '--sdk_location dist/<src dist>'
>>> when running your program.
>>>
>>> Beam currently look for FileSystem implementations using the sub-class
>>> check below.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L58
>>>
>>> For this to work you need to force import your FileSystem similar to
>>> following.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <[email protected]>
>>> wrote:
>>>>
>>>> Hey folks,
>>>>
>>>> So, I've been trying to run this custom S3 filesystem of mine on the
>>>> Dataflow runner. Works fine on the direct runner, even produces the correct
>>>> results. :)
>>>>
>>>> However, when I run it on Dataflow (it's basically a wordcount example
>>>> that has the gs:// paths replaced with s3:// paths), I see the following
>>>> errors:
>>>>
>>>>
>>>> (b84963e5767747e2): Traceback (most recent call last): File
>>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
>>>> line 581, in do_work work_executor.execute() File
>>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
>>>> 209, in execute self._split_task) File
>>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line
>>>> 217, in _perform_source_split_considering_api_limits desired_bundle_size)
>>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
>>>> line 254, in _perform_source_split for split in
>>>> source.split(desired_bundle_size): File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
>>>> line 172, in split return self._get_concat_source().split( File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>>>> line 109, in _f return fnc(self, *args, **kwargs) File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
>>>> line 119, in _get_concat_source match_result =
>>>> FileSystems.match([pattern])[0] File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", 
>>>> line
>>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0]) File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", 
>>>> line
>>>> 61, in get_filesystem raise ValueError('Unable to get the Filesystem for
>>>> path %s' % path) ValueError: Unable to get the Filesystem for path
>>>> s3://pm-dataflow/test
>>>>
>>>>
>>>> To my understanding, this means that the module that contains my
>>>> S3FileSystem doesn't get imported (it currently resides in a separate
>>>> package that I ship to the pipeline). And, I guess, the order of modules
>>>> importing may differ depending on the runner, that's the only explanation 
>>>> of
>>>> why exactly the same pipeline works on the direct runner and produces the
>>>> correct results.
>>>>
>>>> Any ideas of the best way to troubleshoot this? Can I somehow make Beam
>>>> upload some locally-modified version of the apache_beam package to 
>>>> Dataflow?
>>>> I'd be happy to provide more details if needed. Or should I instead be
>>>> reaching out to the Dataflow team?
>>>>
>>>> Thank you.
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>
>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.

Reply via email to