Hi! Have a look at the class-level comments in "InputFormat". They should describe how input formats first generate splits (for parallelization) on the master, and the workers open each split.
So you need something like this: AvroInputFormat<EndSongCleanedPq> avroInputFormat = new AvroInputFormat<EndSongCleanedPq>(new Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class); avroInputFormat.setReuseAvroValue(false); for (FileInputSplit split : avroInputFormat.createInputSplits()) { avroInputFormat.open(split); while (! avroInputFormat.reachedEnd()){ EndSongCleanedPq res = avroInputFormat.nextRecord(new EndSongCleanedPq()); if (res != null) collector.collect(res); } } } Hope that helps. Stephan On Tue, Feb 23, 2016 at 12:04 PM, Martin Neumann <mneum...@sics.se> wrote: > I'm not very familiar with the inner workings of the InputFomat's. calling > .open() got rid of the Nullpointer but the stream still produces no output. > > As a temporary solution I wrote a batch job that just unions all the > different datasets and puts them (sorted) into a single folder. > > cheers Martin > > On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Martin, >> >> where is the null pointer exception thrown? >> I think you didn't call the open() method of the AvroInputFormat. Maybe >> that's the issue. >> >> On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <mneum...@sics.se> wrote: >> >>> I tried to implement your idea but I'm getting NullPointer exceptions >>> from the AvroInputFormat any Idea what I'm doing wrong? >>> See the code below: >>> >>> public static void main(String[] args) throws Exception { >>> >>> // set up the execution environment >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setParallelism(1); >>> >>> env.fromElements("00", "01", "02","03","22","23") >>> .flatMap(new FileExtractor()) >>> .filter(new LocationFiter()) >>> .flatMap(new PreProcessEndSongClean()) >>> .writeAsCsv(outPath); >>> >>> >>> env.execute("something"); >>> } >>> >>> private static class FileExtractor implements >>> FlatMapFunction<String,EndSongCleanedPq>{ >>> >>> @Override >>> public void flatMap(String s, Collector<EndSongCleanedPq> collector) >>> throws Exception { >>> AvroInputFormat<EndSongCleanedPq> avroInputFormat = new >>> AvroInputFormat<EndSongCleanedPq>(new >>> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), >>> EndSongCleanedPq.class); >>> avroInputFormat.setReuseAvroValue(false); >>> while (! avroInputFormat.reachedEnd()){ >>> EndSongCleanedPq res = avroInputFormat.nextRecord(new >>> EndSongCleanedPq()); >>> if (res != null) collector.collect(res); >>> } >>> } >>> } >>> >>> >>> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneum...@sics.se> >>> wrote: >>> >>>> I guess I need to set the parallelism for the FlatMap to 1 to make sure >>>> I read one file at a time. The downside I see with this is that I will be >>>> not able to read in parallel from HDFS (and the files are Huge). >>>> >>>> I give it a try and see how much performance I loose. >>>> >>>> cheers Martin >>>> >>>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Martin, >>>>> >>>>> I think you can approximate this in an easy way like this: >>>>> >>>>> - On the client, you traverse your directories to collect all files >>>>> that you need, collect all file paths in a list. >>>>> - Then you have a source "env.fromElements(paths)". >>>>> - Then you flatMap and in the FlatMap, run the Avro input format >>>>> (open it per path, then call it to get all elements) >>>>> >>>>> That gives you pretty much full control about in which order the files >>>>> should be processed. >>>>> >>>>> What do you think? >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneum...@sics.se> >>>>> wrote: >>>>> >>>>>> I forgot to mention I'm using an AvroInputFormat to read the file >>>>>> (that might be relevant how the flag needs to be applied) >>>>>> See the code Snipped below: >>>>>> >>>>>> DataStream<EndSongCleanedPq> inStream = >>>>>> env.readFile(new AvroInputFormat<EndSongCleanedPq>(new >>>>>> Path(filePath), EndSongCleanedPq.class), filePath); >>>>>> >>>>>> >>>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneum...@sics.se> >>>>>> wrote: >>>>>> >>>>>>> The program is a DataStream program, it usually it gets the data >>>>>>> from kafka. It's an anomaly detection program that learns from the >>>>>>> stream >>>>>>> itself. The reason I want to read from files is to test different >>>>>>> settings >>>>>>> of the algorithm and compare them. >>>>>>> >>>>>>> I think I don't need to reply things in the exact order (wich is not >>>>>>> possible with parallel reads anyway) and I have written the program so >>>>>>> it >>>>>>> can deal with out of order events. >>>>>>> I only need the subfolders to be processed roughly in order. Its >>>>>>> fine to process some stuff from 01 before everything from 00 is >>>>>>> finished, >>>>>>> if I get records from all 24 subfolders at the same time things will >>>>>>> break >>>>>>> though. If I set the flag will it try to get data from all sub dir's in >>>>>>> parallel or will it go sub dir by sub dir? >>>>>>> >>>>>>> Also can you point me to some documentation or something where I can >>>>>>> see how to set the Flag? >>>>>>> >>>>>>> cheers Martin >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <se...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi! >>>>>>>> >>>>>>>> Going through nested folders is pretty simple, there is a flag on >>>>>>>> the FileInputFormat that makes sure those are read. >>>>>>>> >>>>>>>> Tricky is the part that all "00" files should be read before the >>>>>>>> "01" files. If you still want parallel reads, that means you need to >>>>>>>> sync >>>>>>>> at some point, wait for all parallel parts to finish with the "00" work >>>>>>>> before anyone may start with the "01" work. >>>>>>>> >>>>>>>> Is your training program a DataStream or a DataSet program?` >>>>>>>> >>>>>>>> Stephan >>>>>>>> >>>>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneum...@sics.se> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I have a streaming machine learning job that usually runs with >>>>>>>>> input from kafka. To tweak the models I need to run on some old data >>>>>>>>> from >>>>>>>>> HDFS. >>>>>>>>> >>>>>>>>> Unfortunately the data on HDFS is spread out over several >>>>>>>>> subfolders. Basically I have a datum with one subfolder for each hour >>>>>>>>> within those are the actual input files I'm interested in. >>>>>>>>> >>>>>>>>> Basically what I need is a source that goes through the subfolder >>>>>>>>> in order and streams the files into the program. I'm using event >>>>>>>>> timestamps >>>>>>>>> so all files in 00 need to be processed before 01. >>>>>>>>> >>>>>>>>> Has anyone an idea on how to do this? >>>>>>>>> >>>>>>>>> cheers Martin >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >