Yes, I forgot to update the thread, it's what I'm doing and it works fine.

Thanks !
Regards
JB

On 05/05/2017 08:28 PM, Lukasz Cwik wrote:
JB, the ConfigurationLocator is the default instance factory for the
hdfsConfiguration so as long as HADOOP_CONF_DIR/YARN_CONF_DIR is correctly
specified, you should only need to write:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.write().to("hdfs://localhost/path"));

On Fri, May 5, 2017 at 6:23 AM, Jean-Baptiste Onofré <[email protected]
<mailto:[email protected]>> wrote:

    Hi guys,

    thanks Luke, I updated my pipeline like this:

            HadoopFileSystemOptions options =
    
PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
            HadoopFileSystemOptions.ConfigurationLocator locator = new
    HadoopFileSystemOptions.ConfigurationLocator();
            List<Configuration> configurations = locator.create(options);
            Pipeline pipeline = Pipeline.create(options);
            ...
            pipeline.apply(TextIO.write().to("hdfs://localhost/path"));

    I defined HADOOP_CONF_DIR env variable pointing to the folder where I have
    hdfs-site.xml and it works fine.

    I saw that the README.md is not up to date in hadoop-file-system, I'm
    preparing a PR about that and I also preparing a quick documentation about
    HDFS support.

    Regards
    JB

    On 05/04/2017 06:07 PM, Lukasz Cwik wrote:

        JB, for your second point it seems as though you may not be setting the
        Hadoop
        configuration on HadoopFileSystemOptions.
        Also, I just merged https://github.com/apache/beam/pull/2890
        <https://github.com/apache/beam/pull/2890> which will auto
        detect Hadoop configuration based upon your HADOOP_CONF_DIR and
        YARN_CONF_DIR
        environment variables.

        On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <[email protected]
        <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> wrote:

            Hi guys,

            One of key refactoring/new feature we bring in the first stable
        release is
            the "new" Beam filesystems.

            I started to play with it on couple of use cases I have in 
beam-samples.

            1/ TextIO.write() with unbounded PCollection (stream)

            The first use case is the TextIO write with unbounded PCollection 
(good
            timing as we had a question yesterday about this on Slack).

            I confirm that TextIO now supports unbounded PCollection. You have
        to create
            a Window and "flag" TextIO to use windowing.

            Here's the code snippet:

            pipeline


        
.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
                            .apply(MapElements.via(new SimpleFunction<JmsRecord,
        String>() {
                                public String apply(JmsRecord input) {
                                    return input.getPayload();
                                }
                            }))


        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
                            .apply(TextIO.write()
                                    .to("/home/jbonofre/demo/beam/output/uc2")
                                    .withWindowedWrites()
                                    .withNumShards(3));

            Thanks to Dan, I found an issue in the watermark of JmsIO (as it
        uses the
            JMS ack to advance the watermark, it should not be auto but client
        ack). I'm
            preparing a PR for JmsIO about this.
            However the "windowed" TextIO works fine.

            2/ Beam HDFS filesystem

            The other use case is to use the "new" Beam filesystem with TextIO,
            especially HDFS.

            So, in my pipeline, I'm using:



        
.apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));

            In my pom.xml, I define both Beam hadoop-file-system and 
hadoop-client
            dependencies:

                    <dependency>
                        <groupId>org.apache.beam</groupId>

        <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
                        <version>0.7.0-SNAPSHOT</version>
                    </dependency>
                    <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-client</artifactId>
                        <version>2.7.3</version>
                    </dependency>

            Unfortunately, when starting the pipeline, I have:

            Exception in thread "main" java.lang.IllegalStateException: Unable
        to find
            registrar for hdfs
                    at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>

        
<http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
                    at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>

        
<http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
                    at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>

        
<http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
                    at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
                    at

        
org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)

            I gonna investigate tonight and I will let you know.

            Regards
            JB
            --
            Jean-Baptiste Onofré
            [email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
            http://blog.nanthrax.net
            Talend - http://www.talend.com



    --
    Jean-Baptiste Onofré
    [email protected] <mailto:[email protected]>
    http://blog.nanthrax.net
    Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to