JB, for your second point it seems as though you may not be setting the Hadoop configuration on HadoopFileSystemOptions. Also, I just merged https://github.com/apache/beam/pull/2890 which will auto detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR environment variables.
On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi guys, > > One of key refactoring/new feature we bring in the first stable release is > the "new" Beam filesystems. > > I started to play with it on couple of use cases I have in beam-samples. > > 1/ TextIO.write() with unbounded PCollection (stream) > > The first use case is the TextIO write with unbounded PCollection (good > timing as we had a question yesterday about this on Slack). > > I confirm that TextIO now supports unbounded PCollection. You have to > create a Window and "flag" TextIO to use windowing. > > Here's the code snippet: > > pipeline > > .apply(JmsIO.read().withConnectionFactory(connectionFactory) > .withQueue("BEAM")) > .apply(MapElements.via(new SimpleFunction<JmsRecord, > String>() { > public String apply(JmsRecord input) { > return input.getPayload(); > } > })) > > .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))) > .apply(TextIO.write() > .to("/home/jbonofre/demo/beam/output/uc2") > .withWindowedWrites() > .withNumShards(3)); > > Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the > JMS ack to advance the watermark, it should not be auto but client ack). > I'm preparing a PR for JmsIO about this. > However the "windowed" TextIO works fine. > > 2/ Beam HDFS filesystem > > The other use case is to use the "new" Beam filesystem with TextIO, > especially HDFS. > > So, in my pipeline, I'm using: > > .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/ > demo/beam/output/uc1")); > > In my pom.xml, I define both Beam hadoop-file-system and hadoop-client > dependencies: > > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> > <version>0.7.0-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-client</artifactId> > <version>2.7.3</version> > </dependency> > > Unfortunately, when starting the pipeline, I have: > > Exception in thread "main" java.lang.IllegalStateException: Unable to > find registrar for hdfs > at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(Fil > eSystems.java:427) > at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSyst > ems.java:494) > at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIf > Possible(FileBasedSink.java:193) > at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292) > at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT > oHdfs.java:39) > > I gonna investigate tonight and I will let you know. > > Regards > JB > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >