I am able to read and write from HDFS successfully. Here is my code snippet.
p.apply("ReadFromHDFS",
Read.from(HDFSFileSource.from(options.getInputFile(),
TextInputFormat.class, LongWritable.class, Text.class)))
.apply("ExtractPayload", ParDo.of(new ExtractString()))
.apply(new CountWords())
.apply("WriteToHDFS", Write.to(new
HDFSFileSink(options.getOutput(), TextOutputFormat.class)));
One observation is that the output is one file per word. How can I store
all the data into single file and avoid creation of one file per word?
Regards,
Sandeep
On Sat, Nov 26, 2016 at 9:45 AM, Sandeep Deshmukh <[email protected]>
wrote:
> Thanks. It helped. I faced serialisation issues, so moved the DoFn code
> into a static class and it worked.
>
> Used following imports.
>
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
>
> At least now able to move on for input side. Would check for output too.
>
> Thanks & Regards,
> Sandeep
>
> On Sat, Nov 26, 2016 at 1:35 AM, Ismaël Mejía <[email protected]> wrote:
>
>> Hello,
>>
>> I achieved this (reading a text file from HdfsSource) like this:
>>
>> PCollection<String> data = pipeline
>> .apply("ReadFromHDFS", Read.from(
>> HDFSFileSource.from(options.getInput(),
>> TextInputFormat.class,
>> LongWritable.class, Text.class))
>> )
>> .apply("ExtractPayload", ParDo.of(new
>> DoFn<KV<LongWritable, Text>, String>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) throws
>> Exception {
>> c.output(c.element().getValue().toString());
>> }
>> }));
>>
>> Probably there is a better way, but this one worked for me. Writing I
>> think it was easier, I think it was something like this:
>>
>> .apply("WriteToHDFS", Write.to(new
>> HDFSFileSink(options.getOutput(), TextOutputFormat.class))
>>
>> Hope it helps,
>> Ismaël
>>
>>
>> On Fri, Nov 25, 2016 at 6:44 PM, Sandeep Deshmukh <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I am trying to user recently added support for Apex runner. This is to
>>> run the program[1] using Apex on Hadoop cluster. The program is getting
>>> launched successfully.
>>>
>>> I would like to change the input and output to HDFS. I looked at the
>>> HDFSFileSource and planning to use the same. I would reading simple text
>>> file from HDFS and same way writing to HDFS.
>>>
>>> I tried something like below, but looks like missing something trivial.
>>>
>>> Pipeline p = Pipeline.create(options);
>>> HDFSFileSource<IntWritable, Text> source =
>>> HDFSFileSource.from("filePath",
>>> SequenceFileInputFormat.class, IntWritable.class,
>>> Text.class);
>>>
>>> p.apply("ReadLines", source)
>>> .apply(new CountWords())
>>> .apply(....)
>>>
>>> What would be the right format and <Key,Value> to use for this.
>>>
>>> [1] https://github.com/tweise/apex-samples/blob/master/beam-apex
>>> -wordcount/src/main/java/com/example/myapexapp/Application.java
>>>
>>> Regards,
>>> Sandeep
>>>
>>
>>
>