I'm not very familiar with the inner workings of the InputFomat's. calling .open() got rid of the Nullpointer but the stream still produces no output.
As a temporary solution I wrote a batch job that just unions all the different datasets and puts them (sorted) into a single folder. cheers Martin On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Martin, > > where is the null pointer exception thrown? > I think you didn't call the open() method of the AvroInputFormat. Maybe > that's the issue. > > On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann <mneum...@sics.se> wrote: > >> I tried to implement your idea but I'm getting NullPointer exceptions >> from the AvroInputFormat any Idea what I'm doing wrong? >> See the code below: >> >> public static void main(String[] args) throws Exception { >> >> // set up the execution environment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setParallelism(1); >> >> env.fromElements("00", "01", "02","03","22","23") >> .flatMap(new FileExtractor()) >> .filter(new LocationFiter()) >> .flatMap(new PreProcessEndSongClean()) >> .writeAsCsv(outPath); >> >> >> env.execute("something"); >> } >> >> private static class FileExtractor implements >> FlatMapFunction<String,EndSongCleanedPq>{ >> >> @Override >> public void flatMap(String s, Collector<EndSongCleanedPq> collector) >> throws Exception { >> AvroInputFormat<EndSongCleanedPq> avroInputFormat = new >> AvroInputFormat<EndSongCleanedPq>(new >> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), >> EndSongCleanedPq.class); >> avroInputFormat.setReuseAvroValue(false); >> while (! avroInputFormat.reachedEnd()){ >> EndSongCleanedPq res = avroInputFormat.nextRecord(new >> EndSongCleanedPq()); >> if (res != null) collector.collect(res); >> } >> } >> } >> >> >> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann <mneum...@sics.se> wrote: >> >>> I guess I need to set the parallelism for the FlatMap to 1 to make sure >>> I read one file at a time. The downside I see with this is that I will be >>> not able to read in parallel from HDFS (and the files are Huge). >>> >>> I give it a try and see how much performance I loose. >>> >>> cheers Martin >>> >>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Martin, >>>> >>>> I think you can approximate this in an easy way like this: >>>> >>>> - On the client, you traverse your directories to collect all files >>>> that you need, collect all file paths in a list. >>>> - Then you have a source "env.fromElements(paths)". >>>> - Then you flatMap and in the FlatMap, run the Avro input format >>>> (open it per path, then call it to get all elements) >>>> >>>> That gives you pretty much full control about in which order the files >>>> should be processed. >>>> >>>> What do you think? >>>> >>>> Stephan >>>> >>>> >>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann <mneum...@sics.se> >>>> wrote: >>>> >>>>> I forgot to mention I'm using an AvroInputFormat to read the file >>>>> (that might be relevant how the flag needs to be applied) >>>>> See the code Snipped below: >>>>> >>>>> DataStream<EndSongCleanedPq> inStream = >>>>> env.readFile(new AvroInputFormat<EndSongCleanedPq>(new >>>>> Path(filePath), EndSongCleanedPq.class), filePath); >>>>> >>>>> >>>>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann <mneum...@sics.se> >>>>> wrote: >>>>> >>>>>> The program is a DataStream program, it usually it gets the data from >>>>>> kafka. It's an anomaly detection program that learns from the stream >>>>>> itself. The reason I want to read from files is to test different >>>>>> settings >>>>>> of the algorithm and compare them. >>>>>> >>>>>> I think I don't need to reply things in the exact order (wich is not >>>>>> possible with parallel reads anyway) and I have written the program so it >>>>>> can deal with out of order events. >>>>>> I only need the subfolders to be processed roughly in order. Its fine >>>>>> to process some stuff from 01 before everything from 00 is finished, if I >>>>>> get records from all 24 subfolders at the same time things will break >>>>>> though. If I set the flag will it try to get data from all sub dir's in >>>>>> parallel or will it go sub dir by sub dir? >>>>>> >>>>>> Also can you point me to some documentation or something where I can >>>>>> see how to set the Flag? >>>>>> >>>>>> cheers Martin >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi! >>>>>>> >>>>>>> Going through nested folders is pretty simple, there is a flag on >>>>>>> the FileInputFormat that makes sure those are read. >>>>>>> >>>>>>> Tricky is the part that all "00" files should be read before the >>>>>>> "01" files. If you still want parallel reads, that means you need to >>>>>>> sync >>>>>>> at some point, wait for all parallel parts to finish with the "00" work >>>>>>> before anyone may start with the "01" work. >>>>>>> >>>>>>> Is your training program a DataStream or a DataSet program?` >>>>>>> >>>>>>> Stephan >>>>>>> >>>>>>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann <mneum...@sics.se> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have a streaming machine learning job that usually runs with >>>>>>>> input from kafka. To tweak the models I need to run on some old data >>>>>>>> from >>>>>>>> HDFS. >>>>>>>> >>>>>>>> Unfortunately the data on HDFS is spread out over several >>>>>>>> subfolders. Basically I have a datum with one subfolder for each hour >>>>>>>> within those are the actual input files I'm interested in. >>>>>>>> >>>>>>>> Basically what I need is a source that goes through the subfolder >>>>>>>> in order and streams the files into the program. I'm using event >>>>>>>> timestamps >>>>>>>> so all files in 00 need to be processed before 01. >>>>>>>> >>>>>>>> Has anyone an idea on how to do this? >>>>>>>> >>>>>>>> cheers Martin >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >