Sorry Palle, I wrongly understood that you were trying to read a single json object per file...the solution suggested by Fabian is definitely the right solution for your specific use case!
Best, Flavio On 7 May 2016 12:52, "Fabian Hueske" <[email protected]> wrote: > Hi Palle, > > you can recursively read all files in a folder as explained in the > "Recursive Traversal of the Input Path Directory" section of the Data > Source documentation [1]. > > The easiest way to read line-wise JSON objects is to use > ExecutionEnvironment.readTextFile() which reads text files linewise as > strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to > parse the JSON strings. You should use a RichMapFunction and create the > parser in the open() method to avoid instantiating a new parser for each > incoming line. After parsing, the RichMapFunction can emit POJOs. > > Cheers, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources > > 2016-05-07 12:25 GMT+02:00 Flavio Pompermaier <[email protected]>: > >> I had the same issue :) >> I resolved it reading all file paths in a collection, then using this >> code: >> >> env.fromCollection(filePaths).rebalance().map(file2pojo) >> >> You can have your dataset of Pojos! >> >> The rebalance() is necessary to exploit parallelism,otherwise the >> pipeline will be executed with parallelism 1. >> >> Best, >> Flavio >> On 7 May 2016 12:13, "Palle" <[email protected]> wrote: >> >> Hi there. >> >> I've got a HDFS folder containing a lot of files. All files contains a >> lot of JSON objects, one for each line. I will have several TB in the HDFS >> folder. >> >> My plan is to make Flink read all files and all JSON objects and then do >> some analysis on the data, actually very similar to the >> flatMap/groupBy/reduceGroup transformations that is done in the WordCount >> example. >> >> But I am a bit stuck, because I cannot seem to find out how to make Flink >> read all files in a HDFS dir and then perform the transformations on the >> data. I have googled quite a bit and also looked in the Flink API and mail >> history. >> >> Can anyone point me to an example where Flink is used to read all files >> in a HDFS folder and then do transformations on the data)? >> >> - and a second question: Is there an elegant way to make Flink handle the >> JSON objects? - can they be converted to POJOs by something similar to the >> pojoType() method? >> >> /Palle >> >> >
