You can try using wholeTextFile which will give you a pair rdd of fileName, content. flatMap through this and manipulate the content.
Best Regards, Sonal Founder, Nube Technologies <http://www.nubetech.co> Check out Reifier at Spark Summit 2015 <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/> <http://in.linkedin.com/in/sonalgoyal> On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane <pankaj.wah...@qiotec.com> wrote: > Hi community members, > > > Apache Spark is Fantastic and very easy to learn.. Awesome work!!! > > *Question:* > > I have multiple files in a folder and and the first line in each file is > name of the asset that the file belongs to. Second line is csv header row > and data starts from third row.. > > Ex: File 1 > > TestAsset01 > Time,dp_1,dp_2,dp_3 > 11-01-2015 15:00:00,123,456,789 > 11-01-2015 15:00:01,123,456,789 > . . . > > Ex: File 2 > > TestAsset02 > Time,dp_1,dp_2,dp_3 > 11-01-2015 15:00:00,1230,4560,7890 > 11-01-2015 15:00:01,1230,4560,7890 > . . . > > I have got nearly 1000 files in each folder sizing ~10G > > I am using apache spark Java api to read all this files. > > Following is code extract that I am using: > > try (JavaSparkContext sc = new JavaSparkContext(conf)) { > Map<String, String> readingTypeMap = getReadingTypesMap(sc); > //Read File > JavaRDD<String> data = > sc.textFile(resourceBundle.getString(FOLDER_NAME)); > //Get Asset > String asset = data.take(1).get(0); > //Extract Time Series Data > JavaRDD<String> actualData = data.filter(line -> > line.contains(DELIMERTER)); > //Strip header > String header = actualData.take(1).get(0); > String[] headers = header.split(DELIMERTER); > //Extract actual data > JavaRDD<String> timeSeriesLines = actualData.filter(line -> > !line.equals(header)); > //Extract valid records > JavaRDD<String> validated = timeSeriesLines.filter(line -> > validate(line)); > //Find Granularity > Integer granularity = > toInt(resourceBundle.getString(GRANULARITY)); > //Transform to TSD objects > JavaRDD<TimeSeriesData> tsdFlatMap = > transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); > > //Save to Cassandra > > javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"), > "time_series_data", > mapToRow(TimeSeriesData.class)).saveToCassandra(); > > System.out.println("Total Records: " + timeSeriesLines.count()); > System.out.println("Valid Records: " + validated.count()); > } > > Within TimeSeriesData Object I need to set the asset name for the reading, > so I need output of data.take(1) to be different for different files. > > Thank You. > > Best Regards, > Pankaj > > > > > QIO Technologies Limited is a limited company registered in England & > Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number > 09368431 > > This message and the information contained within it is intended solely > for the addressee and may contain confidential or privileged information. > If you have received this message in error please notify QIO Technologies > Limited immediately and then permanently delete this message. If you are > not the intended addressee then you must not copy, transmit, disclose or > rely on the information contained in this message or in any attachment to > it, all such use is prohibited to maximum extent possible by law. >