Thanks. It helped. I faced serialisation issues, so moved the DoFn code
into a static class and it worked.

Used following imports.

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

At least now able to move on for input side. Would check for output too.

Thanks  & Regards,
Sandeep

On Sat, Nov 26, 2016 at 1:35 AM, Ismaël Mejía <[email protected]> wrote:

> Hello,
>
> I achieved this (reading a text file from HdfsSource) like this:
>
>         PCollection<String> data = pipeline
>                 .apply("ReadFromHDFS", Read.from(
>                         HDFSFileSource.from(options.getInput(),
>                                 TextInputFormat.class,
> LongWritable.class, Text.class))
>                 )
>                 .apply("ExtractPayload", ParDo.of(new
> DoFn<KV<LongWritable, Text>, String>() {
>                     @ProcessElement
>                     public void processElement(ProcessContext c) throws
> Exception {
>                         c.output(c.element().getValue().toString());
>                     }
>                 }));
>
> Probably there is a better way, but this one worked for me. Writing I
> think it was easier, I think it was something like this:
>
>             .apply("WriteToHDFS", Write.to(new 
> HDFSFileSink(options.getOutput(),
> TextOutputFormat.class))
>
> Hope it helps,
> Ismaël
>
>
> On Fri, Nov 25, 2016 at 6:44 PM, Sandeep Deshmukh <[email protected]
> > wrote:
>
>> Hi,
>>
>> I am trying to user recently added support for Apex runner. This is to
>> run the program[1] using Apex on Hadoop cluster. The program is getting
>> launched successfully.
>>
>> I would like to change the input and output to HDFS. I looked at the
>> HDFSFileSource and planning to use the same. I would reading simple text
>> file from HDFS and same way writing to HDFS.
>>
>> I tried something like below, but looks like missing something trivial.
>>
>>  Pipeline p = Pipeline.create(options);
>> HDFSFileSource<IntWritable, Text> source =
>>                 HDFSFileSource.from("filePath",
>>                     SequenceFileInputFormat.class, IntWritable.class,
>> Text.class);
>>
>> p.apply("ReadLines", source)
>>        .apply(new CountWords())
>>        .apply(....)
>>
>> What would be the right format and <Key,Value> to use for this.
>>
>> [1] https://github.com/tweise/apex-samples/blob/master/beam-apex
>> -wordcount/src/main/java/com/example/myapexapp/Application.java
>>
>> Regards,
>> Sandeep
>>
>
>

Reply via email to