Hi, Basu, if all columns is separated by delimter "\t", csv parser might be a better choice. for example:
```scala spark.read .option("sep", "\t") .option("header", fasle) .option("inferSchema", true) .csv("/user/root/spark_demo/scala/data/Stations.txt") ``` More details in [DataFrameReader API]( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader ) Then we get two DataFrame, you can register them and use sql to join. On Fri, Feb 17, 2017 at 10:33 PM, Aakash Basu <aakash.spark....@gmail.com> wrote: > Hey Chris, > > Thanks for your quick help. Actually the dataset had issues, otherwise the > logic I implemented was not wrong. > > I did this - > > 1) *V.Imp *– Creating row by segregating columns after reading the > tab delimited file before converting into DF= > > val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1), > x.split("\t")(2).toInt, x.split("\t")(3).toInt)) > > > > Do a take to see if it throws an error or not (this step is just for > ensuring if everything is going fine (as it is a lazy execution, that’s > why)= > > stati.take(2) > > *Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5), > (asfsds,***,43,1)) > > If this comes out, it means it is working fine. We can proceed. > > 2) *V.Imp* - Now converting into DF= > > val station = stati.toDF("StationKey","StationName","Temparature","Station > ID") > > > > Now doing a show to see how it looks like= > > station.show > > *Ans:* > > * +----------+-----------+-----------+---------+* > > *|StationKey|StationName|Temparature|StationID|* > > *+----------+-----------+-----------+---------+* > > *| uihgf| Pune| 56| 5|* > > *| asfsds| ***| 43| 1|* > > *| fkwsdf| Mumbai| 45| 6|* > > *| gddg| ABCD| 32| 2|* > > *| grgzg| *CSD**| 35| 3|* > > *| gsrsn| Howrah| 22| 4|* > > *| ffafv| ***| 34| 7|* > > *+----------+-----------+-----------+---------+* > > > > 3) Do the same for the other dataset - > > i) val storr = stor.map(p => (p.split("\t")(0).toInt, > p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3))) > > ii) storr.take(2) > > iii) val storm = storr.toDF("ID","Name","Temp","Code") > > iv) storm.show > > > > > > 4) Registering as table= > > val stations2 = station.registerTempTable("Stations") > > val storms2 = storm.registerTempTable("Storms") > > > > 5) Querying on the joinedDF as per requirements= > > val joinedDF = sqlContext.sql("Select Stations.StationName as StationName, > Stations.StationID as StationID from Stations inner join Storms on > Storms.Code = Stations.StationKey where Stations.Temparature > 35") > > > > 6) joinedDF.show > > +-----------+---------+ > > |StationName|StationID| > > +-----------+---------+ > > | Pune| 5| > > +-----------+---------+ > > 7) Saving the file as CSV= > > joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile > ("/user/root/spark_demo/scala/data/output/Question6Soln") > > > > Thanks, > > Aakash. > > On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud < > christophe.pre...@kelkoo.com> wrote: > >> Hi Aakash, >> >> You can try this: >> import org.apache.spark.sql.Row >> import org.apache.spark.sql.types.{StringType, StructField, StructType} >> >> val header = Array("col1", "col2", "col3", "col4") >> val schema = StructType(header.map(StructField(_, StringType, true))) >> >> val statRow = stat.map(line => Row(line.split("\t"):_*)) >> val df = spark.createDataFrame(statRow, schema) >> >> df.show >> +------+------+----+----+ >> | col1| col2|col3|col4| >> +------+------+----+----+ >> | uihgf| Paris| 56| 5| >> |asfsds| ***| 43| 1| >> |fkwsdf|London| 45| 6| >> | gddg| ABCD| 32| 2| >> | grgzg| *CSD| 35| 3| >> | gsrsn| ADR*| 22| 4| >> +------+------+----+----+ >> >> Please let me know if this works for you. >> >> Regards, >> Christophe. >> >> >> On 17/02/17 10:37, Aakash Basu wrote: >> >> Hi all, >> >> >> Without using case class I tried making a DF to work on the join and >> other filtration later. But I'm getting an ArrayIndexOutOfBoundException >> error while doing a show of the DF. >> >> >> 1) Importing SQLContext= >> >> import org.apache.spark.sql.SQLContext._ >> >> import org.apache.spark.sql.SQLContext >> >> >> >> 2) Initializing SQLContext= >> >> val sqlContext = new SQLContext(sc) >> >> >> >> 3) Importing implicits package for toDF conversion= >> >> import sqlContext.implicits._ >> >> >> >> 4) Reading the Station and Storm Files= >> >> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt") >> >> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt") >> >> >> >> >> >> stat.foreach(println) >> >> >> uihgf Paris 56 5 >> >> asfsds *** 43 1 >> >> fkwsdf London 45 6 >> >> gddg ABCD 32 2 >> >> grgzg *CSD 35 3 >> >> gsrsn ADR* 22 4 >> >> >> 5) Creating row by segregating columns after reading the tab delimited >> file before converting into DF= >> >> >> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1), >> x.split("\t")(2),x.split("\t")(3)))* >> >> >> >> 6) Converting into DF= >> >> val station = stati.toDF() >> >> *station.show* is giving the below error -> >> >> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID >> 15) >> java.lang.ArrayIndexOutOfBoundsException: 1 >> >> >> Please help! >> >> Thanks, >> Aakash. >> >> >> >> ------------------------------ >> Kelkoo SAS >> Société par Actions Simplifiée >> Au capital de € 4.168.964,30 >> Siège social : 158 Ter Rue du Temple 75003 Paris >> 425 093 069 RCS Paris >> >> Ce message et les pièces jointes sont confidentiels et établis à >> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le >> destinataire de ce message, merci de le détruire et d'en avertir >> l'expéditeur. >> > >