Hi, Basu,
if all columns is separated by delimter "\t", csv parser might be a better
choice.
for example:

```scala
spark.read
         .option("sep", "\t")
         .option("header", fasle)
         .option("inferSchema", true)
         .csv("/user/root/spark_demo/scala/data/Stations.txt")
```
More details in [DataFrameReader API](
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
)

Then we get two DataFrame,
you can register them and use sql to join.





On Fri, Feb 17, 2017 at 10:33 PM, Aakash Basu <aakash.spark....@gmail.com>
wrote:

> Hey Chris,
>
> Thanks for your quick help. Actually the dataset had issues, otherwise the
> logic I implemented was not wrong.
>
> I did this -
>
> 1)      *V.Imp *– Creating row by segregating columns after reading the
> tab delimited file before converting into DF=
>
> val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2).toInt, x.split("\t")(3).toInt))
>
>
>
> Do a take to see if it throws an error or not (this step is just for
> ensuring if everything is going fine (as it is a lazy execution, that’s
> why)=
>
> stati.take(2)
>
> *Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
> (asfsds,***,43,1))
>
> If this comes out, it means it is working fine. We can proceed.
>
> 2)      *V.Imp* - Now converting into DF=
>
> val station = stati.toDF("StationKey","StationName","Temparature","Station
> ID")
>
>
>
> Now doing a show to see how it looks like=
>
> station.show
>
> *Ans:*
>
> * +----------+-----------+-----------+---------+*
>
> *|StationKey|StationName|Temparature|StationID|*
>
> *+----------+-----------+-----------+---------+*
>
> *|     uihgf|       Pune|         56|        5|*
>
> *|    asfsds|        ***|         43|        1|*
>
> *|    fkwsdf|     Mumbai|         45|        6|*
>
> *|      gddg|       ABCD|         32|        2|*
>
> *|     grgzg|     *CSD**|         35|        3|*
>
> *|     gsrsn|     Howrah|         22|        4|*
>
> *|     ffafv|        ***|         34|        7|*
>
> *+----------+-----------+-----------+---------+*
>
>
>
> 3)      Do the same for the other dataset -
>
> i)                 val storr = stor.map(p => (p.split("\t")(0).toInt,
> p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
>
> ii)                storr.take(2)
>
> iii)               val storm = storr.toDF("ID","Name","Temp","Code")
>
> iv)               storm.show
>
>
>
>
>
> 4)      Registering as table=
>
>  val stations2 = station.registerTempTable("Stations")
>
> val storms2 = storm.registerTempTable("Storms")
>
>
>
> 5)      Querying on the joinedDF as per requirements=
>
> val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
> Stations.StationID as StationID from Stations inner join Storms on
> Storms.Code = Stations.StationKey where Stations.Temparature > 35")
>
>
>
> 6)      joinedDF.show
>
> +-----------+---------+
>
> |StationName|StationID|
>
> +-----------+---------+
>
> |       Pune|        5|
>
> +-----------+---------+
>
> 7)      Saving the file as CSV=
>
> joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile
> ("/user/root/spark_demo/scala/data/output/Question6Soln")
>
>
>
> Thanks,
>
> Aakash.
>
> On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
> christophe.pre...@kelkoo.com> wrote:
>
>> Hi Aakash,
>>
>> You can try this:
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>>
>> val header = Array("col1", "col2", "col3", "col4")
>> val schema = StructType(header.map(StructField(_, StringType, true)))
>>
>> val statRow = stat.map(line => Row(line.split("\t"):_*))
>> val df = spark.createDataFrame(statRow, schema)
>>
>> df.show
>> +------+------+----+----+
>> |  col1|  col2|col3|col4|
>> +------+------+----+----+
>> | uihgf| Paris|  56|   5|
>> |asfsds|   ***|  43|   1|
>> |fkwsdf|London|  45|   6|
>> |  gddg|  ABCD|  32|   2|
>> | grgzg|  *CSD|  35|   3|
>> | gsrsn|  ADR*|  22|   4|
>> +------+------+----+----+
>>
>> Please let me know if this works for you.
>>
>> Regards,
>> Christophe.
>>
>>
>> On 17/02/17 10:37, Aakash Basu wrote:
>>
>> Hi all,
>>
>>
>> Without using case class I tried making a DF to work on the join and
>> other filtration later. But I'm getting an ArrayIndexOutOfBoundException
>> error while doing a show of the DF.
>>
>>
>> 1)      Importing SQLContext=
>>
>> import org.apache.spark.sql.SQLContext._
>>
>> import org.apache.spark.sql.SQLContext
>>
>>
>>
>> 2)      Initializing SQLContext=
>>
>> val sqlContext = new SQLContext(sc)
>>
>>
>>
>> 3)      Importing implicits package for toDF conversion=
>>
>> import sqlContext.implicits._
>>
>>
>>
>> 4)      Reading the Station and Storm Files=
>>
>> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>>
>> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>>
>>
>>
>>
>>
>> stat.foreach(println)
>>
>>
>> uihgf   Paris   56   5
>>
>> asfsds   ***   43   1
>>
>> fkwsdf   London   45   6
>>
>> gddg   ABCD   32   2
>>
>> grgzg   *CSD   35   3
>>
>> gsrsn   ADR*   22   4
>>
>>
>> 5) Creating row by segregating columns after reading the tab delimited
>> file before converting into DF=
>>
>>
>> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
>> x.split("\t")(2),x.split("\t")(3)))*
>>
>>
>>
>> 6)      Converting into DF=
>>
>> val station = stati.toDF()
>>
>> *station.show* is giving the below error ->
>>
>> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
>> 15)
>> java.lang.ArrayIndexOutOfBoundsException: 1
>>
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>>
>>
>> ------------------------------
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de € 4.168.964,30
>> Siège social : 158 Ter Rue du Temple 75003 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>

Reply via email to