Hi guys,

thanks Luke, I updated my pipeline like this:

HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class); HadoopFileSystemOptions.ConfigurationLocator locator = new HadoopFileSystemOptions.ConfigurationLocator();
        List<Configuration> configurations = locator.create(options);
        Pipeline pipeline = Pipeline.create(options);
        ...
        pipeline.apply(TextIO.write().to("hdfs://localhost/path"));

I defined HADOOP_CONF_DIR env variable pointing to the folder where I have hdfs-site.xml and it works fine.

I saw that the README.md is not up to date in hadoop-file-system, I'm preparing a PR about that and I also preparing a quick documentation about HDFS support.

Regards
JB

On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
JB, for your second point it seems as though you may not be setting the Hadoop
configuration on HadoopFileSystemOptions.
Also, I just merged https://github.com/apache/beam/pull/2890 which will auto
detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR
environment variables.

On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <[email protected]
<mailto:[email protected]>> wrote:

    Hi guys,

    One of key refactoring/new feature we bring in the first stable release is
    the "new" Beam filesystems.

    I started to play with it on couple of use cases I have in beam-samples.

    1/ TextIO.write() with unbounded PCollection (stream)

    The first use case is the TextIO write with unbounded PCollection (good
    timing as we had a question yesterday about this on Slack).

    I confirm that TextIO now supports unbounded PCollection. You have to create
    a Window and "flag" TextIO to use windowing.

    Here's the code snippet:

    pipeline

    
.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
                    .apply(MapElements.via(new SimpleFunction<JmsRecord, 
String>() {
                        public String apply(JmsRecord input) {
                            return input.getPayload();
                        }
                    }))

    .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
                    .apply(TextIO.write()
                            .to("/home/jbonofre/demo/beam/output/uc2")
                            .withWindowedWrites()
                            .withNumShards(3));

    Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
    JMS ack to advance the watermark, it should not be auto but client ack). I'm
    preparing a PR for JmsIO about this.
    However the "windowed" TextIO works fine.

    2/ Beam HDFS filesystem

    The other use case is to use the "new" Beam filesystem with TextIO,
    especially HDFS.

    So, in my pipeline, I'm using:


    
.apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));

    In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
    dependencies:

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
                <version>0.7.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>

    Unfortunately, when starting the pipeline, I have:

    Exception in thread "main" java.lang.IllegalStateException: Unable to find
    registrar for hdfs
            at org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
            at org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
            at org.apache.beam.sdk.io
    
<http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
            at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
            at
    org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)

    I gonna investigate tonight and I will let you know.

    Regards
    JB
    --
    Jean-Baptiste Onofré
    [email protected] <mailto:[email protected]>
    http://blog.nanthrax.net
    Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to