You can try using wholeTextFile which will give you a pair rdd of fileName,
content. flatMap through this and manipulate the content.

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Check out Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane <pankaj.wah...@qiotec.com>
wrote:

> Hi community members,
>
>
> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
>
> *Question:*
>
> I have multiple files in a folder and and the first line in each file is
> name of the asset that the file belongs to. Second line is csv header row
> and data starts from third row..
>
> Ex: File 1
>
> TestAsset01
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,123,456,789
> 11-01-2015 15:00:01,123,456,789
> . . .
>
> Ex: File 2
>
> TestAsset02
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,1230,4560,7890
> 11-01-2015 15:00:01,1230,4560,7890
> . . .
>
> I have got nearly 1000 files in each folder sizing ~10G
>
> I am using apache spark Java api to read all this files.
>
> Following is code extract that I am using:
>
> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
>             Map<String, String> readingTypeMap = getReadingTypesMap(sc);
>             //Read File
>             JavaRDD<String> data = 
> sc.textFile(resourceBundle.getString(FOLDER_NAME));
>             //Get Asset
>             String asset = data.take(1).get(0);
>             //Extract Time Series Data
>             JavaRDD<String> actualData = data.filter(line -> 
> line.contains(DELIMERTER));
>             //Strip header
>             String header = actualData.take(1).get(0);
>             String[] headers = header.split(DELIMERTER);
>             //Extract actual data
>             JavaRDD<String> timeSeriesLines = actualData.filter(line -> 
> !line.equals(header));
>             //Extract valid records
>             JavaRDD<String> validated = timeSeriesLines.filter(line -> 
> validate(line));
>             //Find Granularity
>             Integer granularity = 
> toInt(resourceBundle.getString(GRANULARITY));
>             //Transform to TSD objects
>             JavaRDD<TimeSeriesData> tsdFlatMap = 
> transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
>
>             //Save to Cassandra
>             
> javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
>                     "time_series_data", 
> mapToRow(TimeSeriesData.class)).saveToCassandra();
>
>             System.out.println("Total Records: " + timeSeriesLines.count());
>             System.out.println("Valid Records: " + validated.count());
>         }
>
> Within TimeSeriesData Object I need to set the asset name for the reading,
> so I need output of data.take(1) to be different for different files.
>
> Thank You.
>
> Best Regards,
> Pankaj
>
>
>
>
> QIO Technologies Limited is a limited company registered in England &
> Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number
> 09368431
>
> This message and the information contained within it is intended solely
> for the addressee and may contain confidential or privileged information.
> If you have received this message in error please notify QIO Technologies
> Limited immediately and then permanently delete this message. If you are
> not the intended addressee then you must not copy, transmit, disclose or
> rely on the information contained in this message or in any attachment to
> it, all such use is prohibited to maximum extent possible by law.
>

Reply via email to