JB, for your second point it seems as though you may not be setting the
Hadoop configuration on HadoopFileSystemOptions.
Also, I just merged https://github.com/apache/beam/pull/2890 which will
auto detect Hadoop configuration based upon your HADOOP_CONF_DIR and
YARN_CONF_DIR environment variables.

On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi guys,
>
> One of key refactoring/new feature we bring in the first stable release is
> the "new" Beam filesystems.
>
> I started to play with it on couple of use cases I have in beam-samples.
>
> 1/ TextIO.write() with unbounded PCollection (stream)
>
> The first use case is the TextIO write with unbounded PCollection (good
> timing as we had a question yesterday about this on Slack).
>
> I confirm that TextIO now supports unbounded PCollection. You have to
> create a Window and "flag" TextIO to use windowing.
>
> Here's the code snippet:
>
> pipeline
>
> .apply(JmsIO.read().withConnectionFactory(connectionFactory)
> .withQueue("BEAM"))
>                 .apply(MapElements.via(new SimpleFunction<JmsRecord,
> String>() {
>                     public String apply(JmsRecord input) {
>                         return input.getPayload();
>                     }
>                 }))
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
>                 .apply(TextIO.write()
>                         .to("/home/jbonofre/demo/beam/output/uc2")
>                         .withWindowedWrites()
>                         .withNumShards(3));
>
> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
> JMS ack to advance the watermark, it should not be auto but client ack).
> I'm preparing a PR for JmsIO about this.
> However the "windowed" TextIO works fine.
>
> 2/ Beam HDFS filesystem
>
> The other use case is to use the "new" Beam filesystem with TextIO,
> especially HDFS.
>
> So, in my pipeline, I'm using:
>
>         .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/
> demo/beam/output/uc1"));
>
> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
> dependencies:
>
>         <dependency>
>             <groupId>org.apache.beam</groupId>
>             <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
>             <version>0.7.0-SNAPSHOT</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-client</artifactId>
>             <version>2.7.3</version>
>         </dependency>
>
> Unfortunately, when starting the pipeline, I have:
>
> Exception in thread "main" java.lang.IllegalStateException: Unable to
> find registrar for hdfs
>         at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(Fil
> eSystems.java:427)
>         at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSyst
> ems.java:494)
>         at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIf
> Possible(FileBasedSink.java:193)
>         at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>         at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT
> oHdfs.java:39)
>
> I gonna investigate tonight and I will let you know.
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to