On Fri, Jul 7, 2017 at 6:33 PM, Ahmet Altay <[email protected]> wrote:
>
>
> On Fri, Jul 7, 2017 at 6:26 PM, Dmitry Demeshchuk <[email protected]>
> wrote:
>>
>> @Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573
>> with some initial context.
>>
>> @Robert: I'm not seeing any errors like that in the Stackdriver logs, but
>> I'll keep an eye on them, just in case. In fact, up until now I've been able
>> to do a lot of things with packaging just fine: install custom apt packages
>> on the Dataflow nodes, even get our internal pip packages shipped there
>> properly and being properly available at the Dataflow nodes.
>>
>> @Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and
>> all of my code is being run from a separate package, that gets installed
>> using a custom --setup_file argument. But I'll try to just use the head and
>> see if it helps. I don't fully understand the notion of plugins though. Is
>> that just an arbitrary Python package, or is it supposed to be an addition
>> to the apache_beam package, akin to "apache_beam[gcp]" ?
>
>
> Arbitrary package. It is a mechanism to import modules before executing
> things. This flag is likely the existing solution for BEAM-2573.

Note that at beam head, any subclass of FileSystem that was imported
in your main module should be automatically added to this flag, i.e.
things should "just work" like they did in the direct runner.
Hopefully they do--let us know either way.

>> On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote:
>>>
>>> Dmitry,
>>>
>>> You can use the --beam_plugin flag [1] to import your plugin before any
>>> other code execution. A failure in this step will result in the warning
>>> Robert mentioned ("Failed to import beam plugin ..."), and you can look for
>>> those in worker logs.
>>>
>>> This flag is not available in the 2.0.0 version and will be available in
>>> the next release. Since you are building the SDK from head, it should be
>>> already available to you.
>>>
>>> Ahmet
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/56e4251deeb080ceff331fc4adb5d68609b04c71/sdks/python/apache_beam/options/pipeline_options.py#L554
>>>
>>> On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]>
>>> wrote:
>>>>
>>>> Do you see any warnings "Failed to import beam plugin ..." in your
>>>> logs? If so, that may indicate some kind of packaging issue.
>>>>
>>>> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath
>>>> <[email protected]> wrote:
>>>> > Discovery of file-system implementations should only occur after your
>>>> > package is installed. But your class won't be discoverable until it is
>>>> > imported somewhere.
>>>> >
>>>> > I agree that we need a better mechanism for registering and
>>>> > discovering new
>>>> > FileSystems. Would you mind filing a JIRA ticket for that ?
>>>> >
>>>> > - Cham
>>>> >
>>>> >
>>>> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk
>>>> > <[email protected]>
>>>> > wrote:
>>>> >>
>>>> >> Thanks for the clarification, Cham!
>>>> >>
>>>> >> Does it mean that for now I'm pretty much stuck with a modified Beam
>>>> >> distribution, unless/until that filesystem is a part of Beam, or
>>>> >> until
>>>> >> there's a better mechanism to discover new filesystem classes?
>>>> >>
>>>> >> Alternatively, maybe it's still possible to change the order of
>>>> >> imports
>>>> >> somehow, so that we initialize the FileSystems class after all the
>>>> >> side
>>>> >> packages are imported? That would be an even better solution,
>>>> >> although it
>>>> >> may not be possible due to circular dependency issues or whatnot.
>>>> >>
>>>> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath
>>>> >> <[email protected]>
>>>> >> wrote:
>>>> >>>
>>>> >>> You should be able to use a locally modified version of Beam using
>>>> >>> following.
>>>> >>>
>>>> >>> python setup.py sdist
>>>> >>> When running your program use option '--sdk_location dist/<src
>>>> >>> dist>'
>>>> >>> when running your program.
>>>> >>>
>>>> >>> Beam currently look for FileSystem implementations using the
>>>> >>> sub-class
>>>> >>> check below.
>>>> >>>
>>>> >>>
>>>> >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L58
>>>> >>>
>>>> >>> For this to work you need to force import your FileSystem similar to
>>>> >>> following.
>>>> >>>
>>>> >>>
>>>> >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
>>>> >>>
>>>> >>> Thanks,
>>>> >>> Cham
>>>> >>>
>>>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk
>>>> >>> <[email protected]>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> Hey folks,
>>>> >>>>
>>>> >>>> So, I've been trying to run this custom S3 filesystem of mine on
>>>> >>>> the
>>>> >>>> Dataflow runner. Works fine on the direct runner, even produces the
>>>> >>>> correct
>>>> >>>> results. :)
>>>> >>>>
>>>> >>>> However, when I run it on Dataflow (it's basically a wordcount
>>>> >>>> example
>>>> >>>> that has the gs:// paths replaced with s3:// paths), I see the
>>>> >>>> following
>>>> >>>> errors:
>>>> >>>>
>>>> >>>>
>>>> >>>> (b84963e5767747e2): Traceback (most recent call last): File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
>>>> >>>> line 581, in do_work work_executor.execute() File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
>>>> >>>> line
>>>> >>>> 209, in execute self._split_task) File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
>>>> >>>> line
>>>> >>>> 217, in _perform_source_split_considering_api_limits
>>>> >>>> desired_bundle_size)
>>>> >>>> File
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
>>>> >>>> line 254, in _perform_source_split for split in
>>>> >>>> source.split(desired_bundle_size): File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
>>>> >>>> line 172, in split return self._get_concat_source().split( File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>>>> >>>> line 109, in _f return fnc(self, *args, **kwargs) File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
>>>> >>>> line 119, in _get_concat_source match_result =
>>>> >>>> FileSystems.match([pattern])[0] File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>>>> >>>>  line
>>>> >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0])
>>>> >>>> File
>>>> >>>>
>>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>>>> >>>>  line
>>>> >>>> 61, in get_filesystem raise ValueError('Unable to get the
>>>> >>>> Filesystem for
>>>> >>>> path %s' % path) ValueError: Unable to get the Filesystem for path
>>>> >>>> s3://pm-dataflow/test
>>>> >>>>
>>>> >>>>
>>>> >>>> To my understanding, this means that the module that contains my
>>>> >>>> S3FileSystem doesn't get imported (it currently resides in a
>>>> >>>> separate
>>>> >>>> package that I ship to the pipeline). And, I guess, the order of
>>>> >>>> modules
>>>> >>>> importing may differ depending on the runner, that's the only
>>>> >>>> explanation of
>>>> >>>> why exactly the same pipeline works on the direct runner and
>>>> >>>> produces the
>>>> >>>> correct results.
>>>> >>>>
>>>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make
>>>> >>>> Beam
>>>> >>>> upload some locally-modified version of the apache_beam package to
>>>> >>>> Dataflow?
>>>> >>>> I'd be happy to provide more details if needed. Or should I instead
>>>> >>>> be
>>>> >>>> reaching out to the Dataflow team?
>>>> >>>>
>>>> >>>> Thank you.
>>>> >>>>
>>>> >>>>
>>>> >>>> --
>>>> >>>> Best regards,
>>>> >>>> Dmitry Demeshchuk.
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Best regards,
>>>> >> Dmitry Demeshchuk.
>>>
>>>
>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>
>

Reply via email to