On Fri, Jul 7, 2017 at 6:26 PM, Dmitry Demeshchuk <[email protected]> wrote:
> @Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573 > with some initial context. > > @Robert: I'm not seeing any errors like that in the Stackdriver logs, but > I'll keep an eye on them, just in case. In fact, up until now I've been > able to do a lot of things with packaging just fine: install custom apt > packages on the Dataflow nodes, even get our internal pip packages shipped > there properly and being properly available at the Dataflow nodes. > > @Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and > all of my code is being run from a separate package, that gets installed > using a custom --setup_file argument. But I'll try to just use the head and > see if it helps. I don't fully understand the notion of plugins though. Is > that just an arbitrary Python package, or is it supposed to be an addition > to the apache_beam package, akin to "apache_beam[gcp]" ? > Arbitrary package. It is a mechanism to import modules before executing things. This flag is likely the existing solution for BEAM-2573. > > On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote: > >> Dmitry, >> >> You can use the --beam_plugin flag [1] to import your plugin before any >> other code execution. A failure in this step will result in the warning >> Robert mentioned ("Failed to import beam plugin ..."), and you can look for >> those in worker logs. >> >> This flag is not available in the 2.0.0 version and will be available in >> the next release. Since you are building the SDK from head, it should be >> already available to you. >> >> Ahmet >> >> [1] https://github.com/apache/beam/blob/56e4251deeb080ceff33 >> 1fc4adb5d68609b04c71/sdks/python/apache_beam/options/ >> pipeline_options.py#L554 >> >> On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]> >> wrote: >> >>> Do you see any warnings "Failed to import beam plugin ..." in your >>> logs? If so, that may indicate some kind of packaging issue. >>> >>> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]> >>> wrote: >>> > Discovery of file-system implementations should only occur after your >>> > package is installed. But your class won't be discoverable until it is >>> > imported somewhere. >>> > >>> > I agree that we need a better mechanism for registering and >>> discovering new >>> > FileSystems. Would you mind filing a JIRA ticket for that ? >>> > >>> > - Cham >>> > >>> > >>> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected] >>> > >>> > wrote: >>> >> >>> >> Thanks for the clarification, Cham! >>> >> >>> >> Does it mean that for now I'm pretty much stuck with a modified Beam >>> >> distribution, unless/until that filesystem is a part of Beam, or until >>> >> there's a better mechanism to discover new filesystem classes? >>> >> >>> >> Alternatively, maybe it's still possible to change the order of >>> imports >>> >> somehow, so that we initialize the FileSystems class after all the >>> side >>> >> packages are imported? That would be an even better solution, >>> although it >>> >> may not be possible due to circular dependency issues or whatnot. >>> >> >>> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath < >>> [email protected]> >>> >> wrote: >>> >>> >>> >>> You should be able to use a locally modified version of Beam using >>> >>> following. >>> >>> >>> >>> python setup.py sdist >>> >>> When running your program use option '--sdk_location dist/<src dist>' >>> >>> when running your program. >>> >>> >>> >>> Beam currently look for FileSystem implementations using the >>> sub-class >>> >>> check below. >>> >>> >>> >>> https://github.com/apache/beam/blob/master/sdks/python/apach >>> e_beam/io/filesystems.py#L58 >>> >>> >>> >>> For this to work you need to force import your FileSystem similar to >>> >>> following. >>> >>> >>> >>> https://github.com/apache/beam/blob/master/sdks/python/apach >>> e_beam/io/filesystems.py#L30 >>> >>> >>> >>> Thanks, >>> >>> Cham >>> >>> >>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk < >>> [email protected]> >>> >>> wrote: >>> >>>> >>> >>>> Hey folks, >>> >>>> >>> >>>> So, I've been trying to run this custom S3 filesystem of mine on the >>> >>>> Dataflow runner. Works fine on the direct runner, even produces the >>> correct >>> >>>> results. :) >>> >>>> >>> >>>> However, when I run it on Dataflow (it's basically a wordcount >>> example >>> >>>> that has the gs:// paths replaced with s3:// paths), I see the >>> following >>> >>>> errors: >>> >>>> >>> >>>> >>> >>>> (b84963e5767747e2): Traceback (most recent call last): File >>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batc >>> hworker.py", >>> >>>> line 581, in do_work work_executor.execute() File >>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >>> line >>> >>>> 209, in execute self._split_task) File >>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >>> line >>> >>>> 217, in _perform_source_split_considering_api_limits >>> desired_bundle_size) >>> >>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/exec >>> utor.py", >>> >>>> line 254, in _perform_source_split for split in >>> >>>> source.split(desired_bundle_size): File >>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb >>> asedsource.py", >>> >>>> line 172, in split return self._get_concat_source().split( File >>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/ >>> value_provider.py", >>> >>>> line 109, in _f return fnc(self, *args, **kwargs) File >>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb >>> asedsource.py", >>> >>>> line 119, in _get_concat_source match_result = >>> >>>> FileSystems.match([pattern])[0] File >>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >>> line >>> >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0]) >>> File >>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >>> line >>> >>>> 61, in get_filesystem raise ValueError('Unable to get the >>> Filesystem for >>> >>>> path %s' % path) ValueError: Unable to get the Filesystem for path >>> >>>> s3://pm-dataflow/test >>> >>>> >>> >>>> >>> >>>> To my understanding, this means that the module that contains my >>> >>>> S3FileSystem doesn't get imported (it currently resides in a >>> separate >>> >>>> package that I ship to the pipeline). And, I guess, the order of >>> modules >>> >>>> importing may differ depending on the runner, that's the only >>> explanation of >>> >>>> why exactly the same pipeline works on the direct runner and >>> produces the >>> >>>> correct results. >>> >>>> >>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make >>> Beam >>> >>>> upload some locally-modified version of the apache_beam package to >>> Dataflow? >>> >>>> I'd be happy to provide more details if needed. Or should I instead >>> be >>> >>>> reaching out to the Dataflow team? >>> >>>> >>> >>>> Thank you. >>> >>>> >>> >>>> >>> >>>> -- >>> >>>> Best regards, >>> >>>> Dmitry Demeshchuk. >>> >> >>> >> >>> >> >>> >> >>> >> -- >>> >> Best regards, >>> >> Dmitry Demeshchuk. >>> >> >> > > > -- > Best regards, > Dmitry Demeshchuk. >
