On Fri, Jul 7, 2017 at 6:26 PM, Dmitry Demeshchuk <[email protected]>
wrote:

> @Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573
> with some initial context.
>
> @Robert: I'm not seeing any errors like that in the Stackdriver logs, but
> I'll keep an eye on them, just in case. In fact, up until now I've been
> able to do a lot of things with packaging just fine: install custom apt
> packages on the Dataflow nodes, even get our internal pip packages shipped
> there properly and being properly available at the Dataflow nodes.
>
> @Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and
> all of my code is being run from a separate package, that gets installed
> using a custom --setup_file argument. But I'll try to just use the head and
> see if it helps. I don't fully understand the notion of plugins though. Is
> that just an arbitrary Python package, or is it supposed to be an addition
> to the apache_beam package, akin to "apache_beam[gcp]" ?
>

Arbitrary package. It is a mechanism to import modules before executing
things. This flag is likely the existing solution for BEAM-2573.



>
> On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote:
>
>> Dmitry,
>>
>> You can use the --beam_plugin flag [1] to import your plugin before any
>> other code execution. A failure in this step will result in the warning
>> Robert mentioned ("Failed to import beam plugin ..."), and you can look for
>> those in worker logs.
>>
>> This flag is not available in the 2.0.0 version and will be available in
>> the next release. Since you are building the SDK from head, it should be
>> already available to you.
>>
>> Ahmet
>>
>> [1] https://github.com/apache/beam/blob/56e4251deeb080ceff33
>> 1fc4adb5d68609b04c71/sdks/python/apache_beam/options/
>> pipeline_options.py#L554
>>
>> On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> Do you see any warnings "Failed to import beam plugin ..." in your
>>> logs? If so, that may indicate some kind of packaging issue.
>>>
>>> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]>
>>> wrote:
>>> > Discovery of file-system implementations should only occur after your
>>> > package is installed. But your class won't be discoverable until it is
>>> > imported somewhere.
>>> >
>>> > I agree that we need a better mechanism for registering and
>>> discovering new
>>> > FileSystems. Would you mind filing a JIRA ticket for that ?
>>> >
>>> > - Cham
>>> >
>>> >
>>> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected]
>>> >
>>> > wrote:
>>> >>
>>> >> Thanks for the clarification, Cham!
>>> >>
>>> >> Does it mean that for now I'm pretty much stuck with a modified Beam
>>> >> distribution, unless/until that filesystem is a part of Beam, or until
>>> >> there's a better mechanism to discover new filesystem classes?
>>> >>
>>> >> Alternatively, maybe it's still possible to change the order of
>>> imports
>>> >> somehow, so that we initialize the FileSystems class after all the
>>> side
>>> >> packages are imported? That would be an even better solution,
>>> although it
>>> >> may not be possible due to circular dependency issues or whatnot.
>>> >>
>>> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath <
>>> [email protected]>
>>> >> wrote:
>>> >>>
>>> >>> You should be able to use a locally modified version of Beam using
>>> >>> following.
>>> >>>
>>> >>> python setup.py sdist
>>> >>> When running your program use option '--sdk_location dist/<src dist>'
>>> >>> when running your program.
>>> >>>
>>> >>> Beam currently look for FileSystem implementations using the
>>> sub-class
>>> >>> check below.
>>> >>>
>>> >>> https://github.com/apache/beam/blob/master/sdks/python/apach
>>> e_beam/io/filesystems.py#L58
>>> >>>
>>> >>> For this to work you need to force import your FileSystem similar to
>>> >>> following.
>>> >>>
>>> >>> https://github.com/apache/beam/blob/master/sdks/python/apach
>>> e_beam/io/filesystems.py#L30
>>> >>>
>>> >>> Thanks,
>>> >>> Cham
>>> >>>
>>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <
>>> [email protected]>
>>> >>> wrote:
>>> >>>>
>>> >>>> Hey folks,
>>> >>>>
>>> >>>> So, I've been trying to run this custom S3 filesystem of mine on the
>>> >>>> Dataflow runner. Works fine on the direct runner, even produces the
>>> correct
>>> >>>> results. :)
>>> >>>>
>>> >>>> However, when I run it on Dataflow (it's basically a wordcount
>>> example
>>> >>>> that has the gs:// paths replaced with s3:// paths), I see the
>>> following
>>> >>>> errors:
>>> >>>>
>>> >>>>
>>> >>>> (b84963e5767747e2): Traceback (most recent call last): File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batc
>>> hworker.py",
>>> >>>> line 581, in do_work work_executor.execute() File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
>>> line
>>> >>>> 209, in execute self._split_task) File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
>>> line
>>> >>>> 217, in _perform_source_split_considering_api_limits
>>> desired_bundle_size)
>>> >>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/exec
>>> utor.py",
>>> >>>> line 254, in _perform_source_split for split in
>>> >>>> source.split(desired_bundle_size): File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb
>>> asedsource.py",
>>> >>>> line 172, in split return self._get_concat_source().split( File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/
>>> value_provider.py",
>>> >>>> line 109, in _f return fnc(self, *args, **kwargs) File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb
>>> asedsource.py",
>>> >>>> line 119, in _get_concat_source match_result =
>>> >>>> FileSystems.match([pattern])[0] File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>>> line
>>> >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0])
>>> File
>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>>> line
>>> >>>> 61, in get_filesystem raise ValueError('Unable to get the
>>> Filesystem for
>>> >>>> path %s' % path) ValueError: Unable to get the Filesystem for path
>>> >>>> s3://pm-dataflow/test
>>> >>>>
>>> >>>>
>>> >>>> To my understanding, this means that the module that contains my
>>> >>>> S3FileSystem doesn't get imported (it currently resides in a
>>> separate
>>> >>>> package that I ship to the pipeline). And, I guess, the order of
>>> modules
>>> >>>> importing may differ depending on the runner, that's the only
>>> explanation of
>>> >>>> why exactly the same pipeline works on the direct runner and
>>> produces the
>>> >>>> correct results.
>>> >>>>
>>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make
>>> Beam
>>> >>>> upload some locally-modified version of the apache_beam package to
>>> Dataflow?
>>> >>>> I'd be happy to provide more details if needed. Or should I instead
>>> be
>>> >>>> reaching out to the Dataflow team?
>>> >>>>
>>> >>>> Thank you.
>>> >>>>
>>> >>>>
>>> >>>> --
>>> >>>> Best regards,
>>> >>>> Dmitry Demeshchuk.
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Best regards,
>>> >> Dmitry Demeshchuk.
>>>
>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Reply via email to