I am able to read and write from HDFS successfully. Here is my code snippet.

       p.apply("ReadFromHDFS",
Read.from(HDFSFileSource.from(options.getInputFile(),
TextInputFormat.class, LongWritable.class, Text.class)))
        .apply("ExtractPayload", ParDo.of(new ExtractString()))
        .apply(new CountWords())
        .apply("WriteToHDFS", Write.to(new
HDFSFileSink(options.getOutput(), TextOutputFormat.class)));

One observation is that the output is one file per word. How can I store
all the data into single file and avoid creation of one file per word?

Regards,
Sandeep

On Sat, Nov 26, 2016 at 9:45 AM, Sandeep Deshmukh <[email protected]>
wrote:

> Thanks. It helped. I faced serialisation issues, so moved the DoFn code
> into a static class and it worked.
>
> Used following imports.
>
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
>
> At least now able to move on for input side. Would check for output too.
>
> Thanks  & Regards,
> Sandeep
>
> On Sat, Nov 26, 2016 at 1:35 AM, Ismaël Mejía <[email protected]> wrote:
>
>> Hello,
>>
>> I achieved this (reading a text file from HdfsSource) like this:
>>
>>         PCollection<String> data = pipeline
>>                 .apply("ReadFromHDFS", Read.from(
>>                         HDFSFileSource.from(options.getInput(),
>>                                 TextInputFormat.class,
>> LongWritable.class, Text.class))
>>                 )
>>                 .apply("ExtractPayload", ParDo.of(new
>> DoFn<KV<LongWritable, Text>, String>() {
>>                     @ProcessElement
>>                     public void processElement(ProcessContext c) throws
>> Exception {
>>                         c.output(c.element().getValue().toString());
>>                     }
>>                 }));
>>
>> Probably there is a better way, but this one worked for me. Writing I
>> think it was easier, I think it was something like this:
>>
>>             .apply("WriteToHDFS", Write.to(new
>> HDFSFileSink(options.getOutput(), TextOutputFormat.class))
>>
>> Hope it helps,
>> Ismaël
>>
>>
>> On Fri, Nov 25, 2016 at 6:44 PM, Sandeep Deshmukh <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I am trying to user recently added support for Apex runner. This is to
>>> run the program[1] using Apex on Hadoop cluster. The program is getting
>>> launched successfully.
>>>
>>> I would like to change the input and output to HDFS. I looked at the
>>> HDFSFileSource and planning to use the same. I would reading simple text
>>> file from HDFS and same way writing to HDFS.
>>>
>>> I tried something like below, but looks like missing something trivial.
>>>
>>>  Pipeline p = Pipeline.create(options);
>>> HDFSFileSource<IntWritable, Text> source =
>>>                 HDFSFileSource.from("filePath",
>>>                     SequenceFileInputFormat.class, IntWritable.class,
>>> Text.class);
>>>
>>> p.apply("ReadLines", source)
>>>        .apply(new CountWords())
>>>        .apply(....)
>>>
>>> What would be the right format and <Key,Value> to use for this.
>>>
>>> [1] https://github.com/tweise/apex-samples/blob/master/beam-apex
>>> -wordcount/src/main/java/com/example/myapexapp/Application.java
>>>
>>> Regards,
>>> Sandeep
>>>
>>
>>
>

Reply via email to