Re: Serialization error - sql UDF related

2017-02-17 Thread vaquar khan
Hi Darshan ,


When you get org.apache.spark.SparkException: Task not serializable
exception, it means that you are using a reference to an instance of a
non-serialize class inside a transformation.

Hope following link will help.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html


Regards,
Vaquar khan

On Fri, Feb 17, 2017 at 9:36 PM, Darshan Pandya 
wrote:

> Hello,
>
> I am getting the famous serialization exception on running some code as
> below,
>
> val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): 
> String);
> val charReference: DataFrame = thinLong.select("char_name_id", 
> "char_name").withColumn("columnNameInDimTable", 
> correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", 
> lit(dimension).cast(StringType)).distinct();
> val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
> val tableName: String = charReferenceTableName.toString
> charReference.saveAsTable(tableName, saveMode)
>
> I think it has something to do with the UDF, so I am pasting the UDF
> function as well
>
> def getNewColumnName(oldColName: String, appendID: Boolean): String = {
>   var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", 
> "_pct").replaceAllLiterally("#", "No")
>   return newColName;
> }
>
>
> *​Exception *seen ​is
>
> Caused by: org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:304)
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:150)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
> at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$
> doExecute$1.apply(TungstenAggregate.scala:86)
> at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$
> doExecute$1.apply(TungstenAggregate.scala:80)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(
> package.scala:48)
> ... 73 more
> Caused by: java.io.NotSerializableException: com.nielsen.datamodel.
> converters.cip2sff.ProductDimensionSFFConverterRealApp$
> Serialization stack:
> - object not serializable (class: com.nielsen.datamodel.
> converters.cip2sff.ProductDimensionSFFConverterRealApp$, value:
> com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRe
> alApp$@247a8411)
> - field (class: com.nielsen.datamodel.converters.cip2sff.
> CommonTransformationTraits$$anonfun$1, name: $outer, type: interface
> com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
> - object (class com.nielsen.datamodel.converters.cip2sff.
> CommonTransformationTraits$$anonfun$1, )
> - field (class: org.apache.spark.sql.catalyst.
> expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface
> scala.Function1)
> - object (class org.apache.spark.sql.catalyst.
> expressions.ScalaUDF$$anonfun$2, )
> - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name:
> f, type: interface scala.Function1)
> - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
> UDF(char_name#3))
> - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name:
> child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
> - object (class org.apache.spark.sql.catalyst.expressions.Alias,
> UDF(char_name#3) AS columnNameInDimTable#304)
> - element of array (index: 2)
> - array (class [Ljava.lang.Object;, size 4)
> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
> class [Ljava.lang.Object;)
> - object (class scala.collection.mutable.ArrayBuffer,
> ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS
> columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
> - field (class: org.apache.spark.sql.execution.Project, name:
> projectList, type: interface scala.collection.Seq)
> - object (class org.apache.spark.sql.execution.Project, Project
> [char_name_id#2,char_name#3,UDF(char_name#3) AS 
> columnNameInDimTable#304,PRODUCT
> AS applicable_dimension#305]
>
>
>
> --
> Sincerely,
> Darshan
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Serialization error - sql UDF related

2017-02-17 Thread Darshan Pandya
Hello,

I am getting the famous serialization exception on running some code as
below,

val correctColNameUDF = udf(getNewColumnName(_: String, false:
Boolean): String);
val charReference: DataFrame = thinLong.select("char_name_id",
"char_name").withColumn("columnNameInDimTable",
correctColNameUDF(col("char_name"))).withColumn("applicable_dimension",
lit(dimension).cast(StringType)).distinct();
val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference"""
val tableName: String = charReferenceTableName.toString
charReference.saveAsTable(tableName, saveMode)

I think it has something to do with the UDF, so I am pasting the UDF
function as well

def getNewColumnName(oldColName: String, appendID: Boolean): String = {
  var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%",
"_pct").replaceAllLiterally("#", "No")
  return newColName;
}


*​Exception *seen ​is

Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2066)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 73 more
Caused by: java.io.NotSerializableException:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$
Serialization stack:
- object not serializable (class:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$,
value:
com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRealApp$@247a8411)
- field (class:
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1,
name: $outer, type: interface
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits)
- object (class
com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits$$anonfun$1,
)
- field (class:
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name:
func$2, type: interface scala.Function1)
- object (class
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, )
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name:
f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
UDF(char_name#3))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name:
child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias,
UDF(char_name#3) AS columnNameInDimTable#304)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 4)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS
columnNameInDimTable#304, PRODUCT AS applicable_dimension#305))
- field (class: org.apache.spark.sql.execution.Project, name: projectList,
type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.Project, Project
[char_name_id#2,char_name#3,UDF(char_name#3) AS
columnNameInDimTable#304,PRODUCT AS applicable_dimension#305]



-- 
Sincerely,
Darshan


Re: Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-17 Thread shyla deshpande
Thanks TD and Marco for the feedback.

The directory referenced by SPARK_LOCAL_DIRS did not exist. After creating
that directory, it worked.

This was the first time I was trying to run spark on standalone cluster, so
I missed it.

Thanks

On Fri, Feb 17, 2017 at 12:35 PM, Tathagata Das  wrote:

> Seems like an issue with the HDFS you are using for checkpointing. Its not
> able to write data properly.
>
> On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande  > wrote:
>
>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> File 
>> /checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3
>> could only be replicated to 0 nodes instead of minReplication (=1).  There
>> are 0 datanode(s) running and no node(s) are excluded in this operation.
>>
>> This is the error I get when I run my spark streaming app on 2 node EC2
>> cluster, with 1 master and 1 worker.
>>
>> Works fine in local mode. Please help.
>>
>> Thanks
>>
>
>


Re: question on SPARK_WORKER_CORES

2017-02-17 Thread kant kodali
one executor per Spark slave should be fine right I am not really sure what
benefit one would get by starting more executors (jvm's) on one node? End
of the day JVM creates native/kernel threads through system calls so if
those threads are spawned by one or multiple processes I dont see much
benefit (In theory it should be the same). With different processes one
would get different address spaces in the kernel but memory isn't an issue
so far.

On Fri, Feb 17, 2017 at 5:32 PM, Alex Kozlov  wrote:

> I found in some previous CDH versions that Spark starts only one executor
> per Spark slave, and DECREASING the executor-cores in standalone makes
> the total # of executors go up.  Just my 2¢.
>
> On Fri, Feb 17, 2017 at 5:20 PM, kant kodali  wrote:
>
>> Hi Satish,
>>
>> I am using spark 2.0.2.  And no I have not passed those variables because
>> I didn't want to shoot in the dark. According to the documentation it looks
>> like SPARK_WORKER_CORES is the one which should do it. If not, can you
>> please explain how these variables inter play together?
>>
>> --num-executors
>> --executor-cores
>> –total-executor-cores
>> SPARK_WORKER_CORES
>>
>> Thanks!
>>
>>
>> On Fri, Feb 17, 2017 at 5:13 PM, Satish Lalam 
>> wrote:
>>
>>> Have you tried passing --executor-cores or –total-executor-cores as
>>> arguments, , depending on the spark version?
>>>
>>>
>>>
>>>
>>>
>>> *From:* kant kodali [mailto:kanth...@gmail.com]
>>> *Sent:* Friday, February 17, 2017 5:03 PM
>>> *To:* Alex Kozlov 
>>> *Cc:* user @spark 
>>> *Subject:* Re: question on SPARK_WORKER_CORES
>>>
>>>
>>>
>>> Standalone.
>>>
>>>
>>>
>>> On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov  wrote:
>>>
>>> What Spark mode are you running the program in?
>>>
>>>
>>>
>>> On Fri, Feb 17, 2017 at 4:55 PM, kant kodali  wrote:
>>>
>>> when I submit a job using spark shell I get something like this
>>>
>>>
>>>
>>> [Stage 0:>(36814 + 4) / 220129]
>>>
>>>
>>>
>>> Now all I want is I want to increase number of parallel tasks running
>>> from 4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
>>> conf/spark-env.sh. I though that should do it but it doesn't. It still
>>> shows me 4. any idea?
>>>
>>>
>>>
>>> Thanks much!
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Alex Kozlov
>>> (408) 507-4987
>>> (650) 887-2135 efax
>>> ale...@gmail.com
>>>
>>>
>>>
>>
>>
>
>
> --
> Alex Kozlov
> (408) 507-4987
> (650) 887-2135 efax
> ale...@gmail.com
>


Re: question on SPARK_WORKER_CORES

2017-02-17 Thread Alex Kozlov
I found in some previous CDH versions that Spark starts only one executor
per Spark slave, and DECREASING the executor-cores in standalone makes the
total # of executors go up.  Just my 2¢.

On Fri, Feb 17, 2017 at 5:20 PM, kant kodali  wrote:

> Hi Satish,
>
> I am using spark 2.0.2.  And no I have not passed those variables because
> I didn't want to shoot in the dark. According to the documentation it looks
> like SPARK_WORKER_CORES is the one which should do it. If not, can you
> please explain how these variables inter play together?
>
> --num-executors
> --executor-cores
> –total-executor-cores
> SPARK_WORKER_CORES
>
> Thanks!
>
>
> On Fri, Feb 17, 2017 at 5:13 PM, Satish Lalam 
> wrote:
>
>> Have you tried passing --executor-cores or –total-executor-cores as
>> arguments, , depending on the spark version?
>>
>>
>>
>>
>>
>> *From:* kant kodali [mailto:kanth...@gmail.com]
>> *Sent:* Friday, February 17, 2017 5:03 PM
>> *To:* Alex Kozlov 
>> *Cc:* user @spark 
>> *Subject:* Re: question on SPARK_WORKER_CORES
>>
>>
>>
>> Standalone.
>>
>>
>>
>> On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov  wrote:
>>
>> What Spark mode are you running the program in?
>>
>>
>>
>> On Fri, Feb 17, 2017 at 4:55 PM, kant kodali  wrote:
>>
>> when I submit a job using spark shell I get something like this
>>
>>
>>
>> [Stage 0:>(36814 + 4) / 220129]
>>
>>
>>
>> Now all I want is I want to increase number of parallel tasks running
>> from 4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
>> conf/spark-env.sh. I though that should do it but it doesn't. It still
>> shows me 4. any idea?
>>
>>
>>
>> Thanks much!
>>
>>
>>
>>
>>
>> --
>>
>> Alex Kozlov
>> (408) 507-4987
>> (650) 887-2135 efax
>> ale...@gmail.com
>>
>>
>>
>
>


-- 
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com


Re: question on SPARK_WORKER_CORES

2017-02-17 Thread kant kodali
Hi Satish,

I am using spark 2.0.2.  And no I have not passed those variables because I
didn't want to shoot in the dark. According to the documentation it looks
like SPARK_WORKER_CORES is the one which should do it. If not, can you
please explain how these variables inter play together?

--num-executors
--executor-cores
–total-executor-cores
SPARK_WORKER_CORES

Thanks!


On Fri, Feb 17, 2017 at 5:13 PM, Satish Lalam  wrote:

> Have you tried passing --executor-cores or –total-executor-cores as
> arguments, , depending on the spark version?
>
>
>
>
>
> *From:* kant kodali [mailto:kanth...@gmail.com]
> *Sent:* Friday, February 17, 2017 5:03 PM
> *To:* Alex Kozlov 
> *Cc:* user @spark 
> *Subject:* Re: question on SPARK_WORKER_CORES
>
>
>
> Standalone.
>
>
>
> On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov  wrote:
>
> What Spark mode are you running the program in?
>
>
>
> On Fri, Feb 17, 2017 at 4:55 PM, kant kodali  wrote:
>
> when I submit a job using spark shell I get something like this
>
>
>
> [Stage 0:>(36814 + 4) / 220129]
>
>
>
> Now all I want is I want to increase number of parallel tasks running from
> 4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
> conf/spark-env.sh. I though that should do it but it doesn't. It still
> shows me 4. any idea?
>
>
>
> Thanks much!
>
>
>
>
>
> --
>
> Alex Kozlov
> (408) 507-4987
> (650) 887-2135 efax
> ale...@gmail.com
>
>
>


RE: question on SPARK_WORKER_CORES

2017-02-17 Thread Satish Lalam
Have you tried passing --executor-cores or –total-executor-cores as arguments, 
, depending on the spark version?


From: kant kodali [mailto:kanth...@gmail.com]
Sent: Friday, February 17, 2017 5:03 PM
To: Alex Kozlov 
Cc: user @spark 
Subject: Re: question on SPARK_WORKER_CORES

Standalone.

On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov 
> wrote:
What Spark mode are you running the program in?

On Fri, Feb 17, 2017 at 4:55 PM, kant kodali 
> wrote:
when I submit a job using spark shell I get something like this


[Stage 0:>(36814 + 4) / 220129]



Now all I want is I want to increase number of parallel tasks running from 4 to 
16 so I exported an env variable called SPARK_WORKER_CORES=16 in 
conf/spark-env.sh. I though that should do it but it doesn't. It still shows me 
4. any idea?



Thanks much!




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com



Re: question on SPARK_WORKER_CORES

2017-02-17 Thread kant kodali
Standalone.

On Fri, Feb 17, 2017 at 5:01 PM, Alex Kozlov  wrote:

> What Spark mode are you running the program in?
>
> On Fri, Feb 17, 2017 at 4:55 PM, kant kodali  wrote:
>
>> when I submit a job using spark shell I get something like this
>>
>> [Stage 0:>(36814 + 4) / 220129]
>>
>>
>> Now all I want is I want to increase number of parallel tasks running
>> from 4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
>> conf/spark-env.sh. I though that should do it but it doesn't. It still
>> shows me 4. any idea?
>>
>>
>> Thanks much!
>>
>>
>>
>
>
> --
> Alex Kozlov
> (408) 507-4987
> (650) 887-2135 efax
> ale...@gmail.com
>


Re: question on SPARK_WORKER_CORES

2017-02-17 Thread Alex Kozlov
What Spark mode are you running the program in?

On Fri, Feb 17, 2017 at 4:55 PM, kant kodali  wrote:

> when I submit a job using spark shell I get something like this
>
> [Stage 0:>(36814 + 4) / 220129]
>
>
> Now all I want is I want to increase number of parallel tasks running from
> 4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
> conf/spark-env.sh. I though that should do it but it doesn't. It still
> shows me 4. any idea?
>
>
> Thanks much!
>
>
>


-- 
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com


question on SPARK_WORKER_CORES

2017-02-17 Thread kant kodali
when I submit a job using spark shell I get something like this

[Stage 0:>(36814 + 4) / 220129]


Now all I want is I want to increase number of parallel tasks running from
4 to 16 so I exported an env variable called SPARK_WORKER_CORES=16 in
conf/spark-env.sh. I though that should do it but it doesn't. It still
shows me 4. any idea?


Thanks much!


Re: Query data in subdirectories in Hive Partitions using Spark SQL

2017-02-17 Thread Yan Facai
Hi, Abdelfatah,
How to you read these files? spark.read.parquet or spark.sql?
Could you show some code?


On Wed, Feb 15, 2017 at 8:47 PM, Ahmed Kamal Abdelfatah <
ahmed.abdelfa...@careem.com> wrote:

> Hi folks,
>
>
>
> How can I force spark sql to recursively get data stored in parquet format
> from subdirectories ?  In Hive, I could achieve this by setting few Hive
> configs.
>
>
>
> set hive.input.dir.recursive=true;
>
> set hive.mapred.supports.subdirectories=true;
>
> set hive.supports.subdirectories=true;
>
> set mapred.input.dir.recursive=true;
>
>
>
> I tried to set these configs through spark sql queries but I get 0 records
> all the times compared to hive which get me the expected results. I also
> put these confs in hive-site.xml file but nothing changed. How can I handle
> this issue ?
>
>
>
> Spark Version : 2.1.0
>
> I used Hive 2.1.1  on emr-5.3.1
>
>
>
> *Regards, *
>
>
>
>
> *Ahmed Kamal*
> *MTS in Data Science*
>
> *Email: **ahmed.abdelfa...@careem.com *
>
>
>
>
>


Re: How to specify default value for StructField?

2017-02-17 Thread Yan Facai
I agree with Yong Zhang,
perhaps spark sql with hive could solve the problem:

http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables




On Thu, Feb 16, 2017 at 12:42 AM, Yong Zhang  wrote:

> If it works under hive, do you try just create the DF from Hive table
> directly in Spark? That should work, right?
>
>
> Yong
>
>
> --
> *From:* Begar, Veena 
> *Sent:* Wednesday, February 15, 2017 10:16 AM
> *To:* Yong Zhang; smartzjp; user@spark.apache.org
>
> *Subject:* RE: How to specify default value for StructField?
>
>
> Thanks Yong.
>
>
>
> I know about merging the schema option.
>
> Using Hive we can read AVRO files having different schemas. And also we
> can do the same in Spark also.
>
> Similarly we can read ORC files having different schemas in Hive. But, we
> can’t do the same in Spark using dataframe. How we can do it using
> dataframe?
>
>
>
> Thanks.
>
> *From:* Yong Zhang [mailto:java8...@hotmail.com]
> *Sent:* Tuesday, February 14, 2017 8:31 PM
> *To:* Begar, Veena ; smartzjp ;
> user@spark.apache.org
> *Subject:* Re: How to specify default value for StructField?
>
>
>
> You maybe are looking for something like "spark.sql.parquet.mergeSchema"
> for ORC. Unfortunately, I don't think it is available, unless someone tells
> me I am wrong.
>
>
> You can create a JIRA to request this feature, but we all know that
> Parquet is the first citizen format [image: ]
>
>
>
> Yong
>
>
> --
>
> *From:* Begar, Veena 
> *Sent:* Tuesday, February 14, 2017 10:37 AM
> *To:* smartzjp; user@spark.apache.org
> *Subject:* RE: How to specify default value for StructField?
>
>
>
> Thanks, it didn't work. Because, the folder has files from 2 different
> schemas.
> It fails with the following exception:
> org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input
> columns: [f1];
>
>
> -Original Message-
> From: smartzjp [mailto:zjp_j...@163.com ]
> Sent: Tuesday, February 14, 2017 10:32 AM
> To: Begar, Veena ; user@spark.apache.org
> Subject: Re: How to specify default value for StructField?
>
> You can try the below code.
>
> val df = spark.read.format("orc").load("/user/hos/orc_files_test_
> together")
> df.select(“f1”,”f2”).show
>
>
>
>
>
> 在 2017/2/14 
> 

Re: How to convert RDD to DF for this case -

2017-02-17 Thread Yan Facai
Hi, Basu,
if all columns is separated by delimter "\t", csv parser might be a better
choice.
for example:

```scala
spark.read
 .option("sep", "\t")
 .option("header", fasle)
 .option("inferSchema", true)
 .csv("/user/root/spark_demo/scala/data/Stations.txt")
```
More details in [DataFrameReader API](
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
)

Then we get two DataFrame,
you can register them and use sql to join.





On Fri, Feb 17, 2017 at 10:33 PM, Aakash Basu 
wrote:

> Hey Chris,
>
> Thanks for your quick help. Actually the dataset had issues, otherwise the
> logic I implemented was not wrong.
>
> I did this -
>
> 1)  *V.Imp *– Creating row by segregating columns after reading the
> tab delimited file before converting into DF=
>
> val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2).toInt, x.split("\t")(3).toInt))
>
>
>
> Do a take to see if it throws an error or not (this step is just for
> ensuring if everything is going fine (as it is a lazy execution, that’s
> why)=
>
> stati.take(2)
>
> *Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
> (asfsds,***,43,1))
>
> If this comes out, it means it is working fine. We can proceed.
>
> 2)  *V.Imp* - Now converting into DF=
>
> val station = stati.toDF("StationKey","StationName","Temparature","Station
> ID")
>
>
>
> Now doing a show to see how it looks like=
>
> station.show
>
> *Ans:*
>
> * +--+---+---+-+*
>
> *|StationKey|StationName|Temparature|StationID|*
>
> *+--+---+---+-+*
>
> *| uihgf|   Pune| 56|5|*
>
> *|asfsds|***| 43|1|*
>
> *|fkwsdf| Mumbai| 45|6|*
>
> *|  gddg|   ABCD| 32|2|*
>
> *| grgzg| *CSD**| 35|3|*
>
> *| gsrsn| Howrah| 22|4|*
>
> *| ffafv|***| 34|7|*
>
> *+--+---+---+-+*
>
>
>
> 3)  Do the same for the other dataset -
>
> i) val storr = stor.map(p => (p.split("\t")(0).toInt,
> p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
>
> ii)storr.take(2)
>
> iii)   val storm = storr.toDF("ID","Name","Temp","Code")
>
> iv)   storm.show
>
>
>
>
>
> 4)  Registering as table=
>
>  val stations2 = station.registerTempTable("Stations")
>
> val storms2 = storm.registerTempTable("Storms")
>
>
>
> 5)  Querying on the joinedDF as per requirements=
>
> val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
> Stations.StationID as StationID from Stations inner join Storms on
> Storms.Code = Stations.StationKey where Stations.Temparature > 35")
>
>
>
> 6)  joinedDF.show
>
> +---+-+
>
> |StationName|StationID|
>
> +---+-+
>
> |   Pune|5|
>
> +---+-+
>
> 7)  Saving the file as CSV=
>
> joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile
> ("/user/root/spark_demo/scala/data/output/Question6Soln")
>
>
>
> Thanks,
>
> Aakash.
>
> On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
> christophe.pre...@kelkoo.com> wrote:
>
>> Hi Aakash,
>>
>> You can try this:
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>>
>> val header = Array("col1", "col2", "col3", "col4")
>> val schema = StructType(header.map(StructField(_, StringType, true)))
>>
>> val statRow = stat.map(line => Row(line.split("\t"):_*))
>> val df = spark.createDataFrame(statRow, schema)
>>
>> df.show
>> +--+--+++
>> |  col1|  col2|col3|col4|
>> +--+--+++
>> | uihgf| Paris|  56|   5|
>> |asfsds|   ***|  43|   1|
>> |fkwsdf|London|  45|   6|
>> |  gddg|  ABCD|  32|   2|
>> | grgzg|  *CSD|  35|   3|
>> | gsrsn|  ADR*|  22|   4|
>> +--+--+++
>>
>> Please let me know if this works for you.
>>
>> Regards,
>> Christophe.
>>
>>
>> On 17/02/17 10:37, Aakash Basu wrote:
>>
>> Hi all,
>>
>>
>> Without using case class I tried making a DF to work on the join and
>> other filtration later. But I'm getting an ArrayIndexOutOfBoundException
>> error while doing a show of the DF.
>>
>>
>> 1)  Importing SQLContext=
>>
>> import org.apache.spark.sql.SQLContext._
>>
>> import org.apache.spark.sql.SQLContext
>>
>>
>>
>> 2)  Initializing SQLContext=
>>
>> val sqlContext = new SQLContext(sc)
>>
>>
>>
>> 3)  Importing implicits package for toDF conversion=
>>
>> import sqlContext.implicits._
>>
>>
>>
>> 4)  Reading the Station and Storm Files=
>>
>> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>>
>> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>>
>>
>>
>>
>>
>> stat.foreach(println)
>>
>>
>> uihgf   Paris   56   5
>>
>> asfsds   ***   43   

How do I increase readTimeoutMillis parameter in Spark-shell?

2017-02-17 Thread kant kodali
How do I increase readTimeoutMillis parameter in Spark-shell? because in
the middle of CassandraCount The job aborts with the following exception

java.io.IOException: Exception during execution of SELECT count(*) FROM
"test"."hello" WHERE token("cid") > ? AND token("cid") <= ?   ALLOW
FILTERING: [ip.us-east-2.compute.internal/X.X.X.X] Timed out waiting for
server response


I also did nodetool status. It says everything is UN.


Thanks,

kant


Executor tab values in Spark Application UI

2017-02-17 Thread satishl
I would like to understand spark Application UI's executor tab values better. 
Are the values for Input, Shuffle Rad and Shuffle Write for sum of values
for all tasks across all stages?
If yes, then it appears that value isnt much of help while debugging? 
Or am I missing the point of these values?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-tab-values-in-Spark-Application-UI-tp28402.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-17 Thread Tathagata Das
Seems like an issue with the HDFS you are using for checkpointing. Its not
able to write data properly.

On Thu, Feb 16, 2017 at 2:40 PM, shyla deshpande 
wrote:

> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> File 
> /checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3
> could only be replicated to 0 nodes instead of minReplication (=1).  There
> are 0 datanode(s) running and no node(s) are excluded in this operation.
>
> This is the error I get when I run my spark streaming app on 2 node EC2
> cluster, with 1 master and 1 worker.
>
> Works fine in local mode. Please help.
>
> Thanks
>


Re: Spark on Mesos with Docker in bridge networking mode

2017-02-17 Thread Michael Gummelt
There's a JIRA here: https://issues.apache.org/jira/browse/SPARK-11638

I haven't had time to look at it.

On Thu, Feb 16, 2017 at 11:00 AM, cherryii  wrote:

> I'm getting errors when I try to run my docker container in bridge
> networking
> mode on mesos.
> Here is my spark submit script
>
> /spark/bin/spark-submit \
>  --class com.package.MySparkJob \
>  --name My-Spark-Job \
>  --files /path/config.cfg, ${JAR} \
>  --master ${SPARK_MASTER_HOST} \
>  --deploy-mode client \
>  --supervise \
>  --total-executor-cores ${SPARK_EXECUTOR_TOTAL_CORES} \
>  --driver-cores ${SPARK_DRIVER_CORES} \
>  --driver-memory ${SPARK_DRIVER_MEMORY} \
>  --num-executors ${SPARK_NUM_EXECUTORS} \
>  --executor-cores ${SPARK_EXECUTOR_CORES} \
>  --executor-memory ${SPARK_EXECUTOR_MEMORY} \
>  --driver-class-path ${JAR} \
>  --conf
> "spark.mesos.executor.docker.image=${SPARK_MESOS_EXECUTOR_DOCKER_IMAGE}" \
>  --conf
> "spark.mesos.executor.docker.volumes=${SPARK_MESOS_
> EXECUTOR_DOCKER_VOLUMES}"
> \
>  --conf "spark.mesos.uris=${SPARK_MESOS_URIS}" \
>  --conf "spark.executorEnv.OBERON_DB_PASS=${OBERON_DB_PASS}" \
>  --conf "spark.executorEnv.S3_SECRET_ACCESS_KEY=${S3_SECRET_ACCESS_KEY}" \
>  --conf "spark.executorEnv.S3_ACCESS_KEY=${S3_ACCESS_KEY}" \
>  --conf "spark.mesos.executor.home=${SPARK_HOME}" \
>  --conf "spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY=${SPARK_MESOS_LIB}" \
>  --conf "spark.files.overwrite=true" \
>  --conf "spark.shuffle.service.enabled=false" \
>  --conf "spark.dynamicAllocation.enabled=false" \
>  --conf "spark.ui.port=${PORT_SPARKUI}" \
>  --conf "spark.driver.host=${SPARK_PUBLIC_DNS}" \
>  --conf "spark.driver.port=${PORT_SPARKDRIVER}" \
>  --conf "spark.driver.blockManager.port=${PORT_SPARKBLOCKMANAGER}" \
>  --conf "spark.jars=${JAR}" \
>  --conf "spark.executor.extraClassPath=${JAR}" \
>  ${JAR}
>
> Here is the error I'm seeing:
> java.net.BindException: Cannot assign requested address: Service
> 'sparkDriver' failed after 16 retries! Consider explicitly setting the
> appropriate port for the service 'sparkDriver' (for example spark.ui.port
> for SparkUI) to an available port or increasing spark.port.maxRetries.
> at sun.nio.ch.Net.bind0(Native Method)
> at sun.nio.ch.Net.bind(Net.java:433)
> at sun.nio.ch.Net.bind(Net.java:425)
> at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:
> 223)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> at
> io.netty.channel.socket.nio.NioServerSocketChannel.doBind(
> NioServerSocketChannel.java:125)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.bind(
> AbstractChannel.java:485)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.bind(
> DefaultChannelPipeline.java:1089)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeBind(
> AbstractChannelHandlerContext.java:430)
> at
> io.netty.channel.AbstractChannelHandlerContext.bind(
> AbstractChannelHandlerContext.java:415)
> at
> io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:
> 903)
> at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
> at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
> SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> I was trying to follow instructions here:
> https://github.com/apache/spark/pull/15120
> So in my Marathon json I'm defining the ports to use for the spark driver,
> spark ui and block manager.
>
> Can anyone help me get this running in bridge networking mode?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-on-Mesos-with-Docker-in-
> bridge-networking-mode-tp28397.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Graphx Examples for ALS

2017-02-17 Thread Irving Duran
Not sure I follow your question.  Do you want to use ALS or GraphX?


Thank You,

Irving Duran

On Fri, Feb 17, 2017 at 7:07 AM, balaji9058  wrote:

> Hi,
>
> Where can i find the the ALS recommendation algorithm for large data set?
>
> Please feel to share your ideas/algorithms/logic to build recommendation
> engine by using spark graphx
>
> Thanks in advance.
>
> Thanks,
> Balaji
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Graphx-Examples-for-ALS-tp28401.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: I am not sure why I am getting java.lang.NoClassDefFoundError

2017-02-17 Thread kant kodali
Oops I think that should fix it. I am going to try it now..

Great catch! I feel like an idiot.

On Fri, Feb 17, 2017 at 10:02 AM, Russell Spitzer  wrote:

> Great catch Anastasios!
>
> On Fri, Feb 17, 2017 at 9:59 AM Anastasios Zouzias 
> wrote:
>
>> Hey,
>>
>> Can you try with the 2.11 spark-cassandra-connector? You just reported
>> that you use spark-cassandra-connector*_2.10*-2.0.0-RC1.jar
>>
>> Best,
>> Anastasios
>>
>> On Fri, Feb 17, 2017 at 6:40 PM, kant kodali  wrote:
>>
>> Hi,
>>
>>
>> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map(
>> "table" -> "hello", "keyspace" -> "test" )).load()
>>
>> This line works fine. I can see it actually pulled the table schema from
>> cassandra. however when I do
>>
>> df.count I get the error below.
>>
>>
>> I am using the following jars.
>>
>> spark version 2.0.2
>>
>> spark-sql_2.11-2.0.2.jar
>>
>> spark-cassandra-connector_2.10-2.0.0-RC1.jar
>>
>> Java version 8
>>
>> scala version 2.11.8
>>
>>
>>
>> java.lang.NoClassDefFoundError: scala/runtime/
>> AbstractPartialFunction$mcJL$sp
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>
>> at java.security.SecureClassLoader.defineClass(
>> SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>> at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(
>> CassandraLimit.scala:21)
>>
>> at com.datastax.spark.connector.rdd.CassandraTableScanRDD.
>> compute(CassandraTableScanRDD.scala:367)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:79)
>>
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:47)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>>
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ClassNotFoundException: scala.runtime.
>> AbstractPartialFunction$mcJL$sp
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>> ... 35 more
>>
>> 17/02/17 17:35:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
>> 0)
>>
>> java.lang.NoClassDefFoundError: com/datastax/spark/connector/
>> rdd/CassandraLimit$$anonfun$limitForIterator$1
>>
>> at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(
>> CassandraLimit.scala:21)
>>
>> at com.datastax.spark.connector.rdd.CassandraTableScanRDD.
>> compute(CassandraTableScanRDD.scala:367)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at 

Re: I am not sure why I am getting java.lang.NoClassDefFoundError

2017-02-17 Thread Russell Spitzer
Great catch Anastasios!

On Fri, Feb 17, 2017 at 9:59 AM Anastasios Zouzias 
wrote:

> Hey,
>
> Can you try with the 2.11 spark-cassandra-connector? You just reported
> that you use spark-cassandra-connector*_2.10*-2.0.0-RC1.jar
>
> Best,
> Anastasios
>
> On Fri, Feb 17, 2017 at 6:40 PM, kant kodali  wrote:
>
> Hi,
>
>
> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map(
> "table" -> "hello", "keyspace" -> "test" )).load()
>
> This line works fine. I can see it actually pulled the table schema from
> cassandra. however when I do
>
> df.count I get the error below.
>
>
> I am using the following jars.
>
> spark version 2.0.2
>
> spark-sql_2.11-2.0.2.jar
>
> spark-cassandra-connector_2.10-2.0.0-RC1.jar
>
> Java version 8
>
> scala version 2.11.8
>
>
>
> java.lang.NoClassDefFoundError:
> scala/runtime/AbstractPartialFunction$mcJL$sp
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at
> com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)
>
> at
> com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassNotFoundException:
> scala.runtime.AbstractPartialFunction$mcJL$sp
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 35 more
>
> 17/02/17 17:35:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
> java.lang.NoClassDefFoundError:
> com/datastax/spark/connector/rdd/CassandraLimit$$anonfun$limitForIterator$1
>
> at
> com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)
>
> at
> com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at 

Re: I am not sure why I am getting java.lang.NoClassDefFoundError

2017-02-17 Thread Anastasios Zouzias
Hey,

Can you try with the 2.11 spark-cassandra-connector? You just reported that
you use spark-cassandra-connector*_2.10*-2.0.0-RC1.jar

Best,
Anastasios

On Fri, Feb 17, 2017 at 6:40 PM, kant kodali  wrote:

> Hi,
>
>
> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map(
> "table" -> "hello", "keyspace" -> "test" )).load()
>
> This line works fine. I can see it actually pulled the table schema from
> cassandra. however when I do
>
> df.count I get the error below.
>
>
> I am using the following jars.
>
> spark version 2.0.2
>
> spark-sql_2.11-2.0.2.jar
>
> spark-cassandra-connector_2.10-2.0.0-RC1.jar
>
> Java version 8
>
> scala version 2.11.8
>
>
>
> java.lang.NoClassDefFoundError: scala/runtime/
> AbstractPartialFunction$mcJL$sp
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(
> CassandraLimit.scala:21)
>
> at com.datastax.spark.connector.rdd.CassandraTableScanRDD.
> compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassNotFoundException: scala.runtime.
> AbstractPartialFunction$mcJL$sp
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 35 more
>
> 17/02/17 17:35:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
> java.lang.NoClassDefFoundError: com/datastax/spark/connector/
> rdd/CassandraLimit$$anonfun$limitForIterator$1
>
> at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(
> CassandraLimit.scala:21)
>
> at com.datastax.spark.connector.rdd.CassandraTableScanRDD.
> compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at 

I am not sure why I am getting java.lang.NoClassDefFoundError

2017-02-17 Thread kant kodali
Hi,


val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map(
"table" -> "hello", "keyspace" -> "test" )).load()

This line works fine. I can see it actually pulled the table schema from
cassandra. however when I do

df.count I get the error below.


I am using the following jars.

spark version 2.0.2

spark-sql_2.11-2.0.2.jar

spark-cassandra-connector_2.10-2.0.0-RC1.jar

Java version 8

scala version 2.11.8



java.lang.NoClassDefFoundError:
scala/runtime/AbstractPartialFunction$mcJL$sp

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)

at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

at java.net.URLClassLoader$1.run(URLClassLoader.java:368)

at java.net.URLClassLoader$1.run(URLClassLoader.java:362)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:361)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at
com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)

at
com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:86)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassNotFoundException:
scala.runtime.AbstractPartialFunction$mcJL$sp

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 35 more

17/02/17 17:35:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.lang.NoClassDefFoundError:
com/datastax/spark/connector/rdd/CassandraLimit$$anonfun$limitForIterator$1

at
com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)

at
com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:86)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at 

Re: skewed data in join

2017-02-17 Thread Jon Gregg
It depends how you salt it.  See slide 40 and onwards from a spark summit
talk here: http://www.slideshare.net/cloudera/top-5-mistakes-
to-avoid-when-writing-apache-spark-applications  The speakers use a mod8
integer salt appended to the end of the key, the salt that works best for
you might be different.

On Thu, Feb 16, 2017 at 12:40 PM, Gourav Sengupta  wrote:

> Hi,
>
> Thanks for your kind response. The hash key using random numbers increases
> the time for processing the data. My entire join for the entire month
> finishes within 150 seconds for 471 million records and then stays for
> another 6 mins for 55 million records.
>
> Using hash keys increases the processing time to 11 mins. Therefore I am
> not quite clear why should I do that. The overall idea was to ensure that
> the entire processing of around 520 million records in may be another 10
> seconds more.
>
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Feb 16, 2017 at 4:54 PM, Anis Nasir  wrote:
>
>> You can also so something similar to what is mentioned in [1].
>>
>> The basic idea is to use two hash functions for each key and assigning it
>> to the least loaded of the two hashed worker.
>>
>> Cheers,
>> Anis
>>
>>
>> [1]. https://melmeric.files.wordpress.com/2014/11/the-power-of-
>> both-choices-practical-load-balancing-for-distributed-
>> stream-processing-engines.pdf
>>
>>
>> On Fri, 17 Feb 2017 at 01:34, Yong Zhang  wrote:
>>
>>> Yes. You have to change your key, or as BigData term, "adding salt".
>>>
>>>
>>> Yong
>>>
>>> --
>>> *From:* Gourav Sengupta 
>>> *Sent:* Thursday, February 16, 2017 11:11 AM
>>> *To:* user
>>> *Subject:* skewed data in join
>>>
>>> Hi,
>>>
>>> Is there a way to do multiple reducers for joining on skewed data?
>>>
>>> Regards,
>>> Gourav
>>>
>>
>


Re: How to convert RDD to DF for this case -

2017-02-17 Thread Aakash Basu
Hey Chris,

Thanks for your quick help. Actually the dataset had issues, otherwise the
logic I implemented was not wrong.

I did this -

1)  *V.Imp *– Creating row by segregating columns after reading the tab
delimited file before converting into DF=

val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2).toInt, x.split("\t")(3).toInt))



Do a take to see if it throws an error or not (this step is just for
ensuring if everything is going fine (as it is a lazy execution, that’s
why)=

stati.take(2)

*Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
(asfsds,***,43,1))

If this comes out, it means it is working fine. We can proceed.

2)  *V.Imp* - Now converting into DF=

val station =
stati.toDF("StationKey","StationName","Temparature","StationID")



Now doing a show to see how it looks like=

station.show

*Ans:*

* +--+---+---+-+*

*|StationKey|StationName|Temparature|StationID|*

*+--+---+---+-+*

*| uihgf|   Pune| 56|5|*

*|asfsds|***| 43|1|*

*|fkwsdf| Mumbai| 45|6|*

*|  gddg|   ABCD| 32|2|*

*| grgzg| *CSD**| 35|3|*

*| gsrsn| Howrah| 22|4|*

*| ffafv|***| 34|7|*

*+--+---+---+-+*



3)  Do the same for the other dataset -

i) val storr = stor.map(p => (p.split("\t")(0).toInt,
p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))

ii)storr.take(2)

iii)   val storm = storr.toDF("ID","Name","Temp","Code")

iv)   storm.show





4)  Registering as table=

 val stations2 = station.registerTempTable("Stations")

val storms2 = storm.registerTempTable("Storms")



5)  Querying on the joinedDF as per requirements=

val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
Stations.StationID as StationID from Stations inner join Storms on
Storms.Code = Stations.StationKey where Stations.Temparature > 35")



6)  joinedDF.show

+---+-+

|StationName|StationID|

+---+-+

|   Pune|5|

+---+-+

7)  Saving the file as CSV=

joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile("/user/root/spark_demo/scala/data/output/Question6Soln")



Thanks,

Aakash.

On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
christophe.pre...@kelkoo.com> wrote:

> Hi Aakash,
>
> You can try this:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>
> val header = Array("col1", "col2", "col3", "col4")
> val schema = StructType(header.map(StructField(_, StringType, true)))
>
> val statRow = stat.map(line => Row(line.split("\t"):_*))
> val df = spark.createDataFrame(statRow, schema)
>
> df.show
> +--+--+++
> |  col1|  col2|col3|col4|
> +--+--+++
> | uihgf| Paris|  56|   5|
> |asfsds|   ***|  43|   1|
> |fkwsdf|London|  45|   6|
> |  gddg|  ABCD|  32|   2|
> | grgzg|  *CSD|  35|   3|
> | gsrsn|  ADR*|  22|   4|
> +--+--+++
>
> Please let me know if this works for you.
>
> Regards,
> Christophe.
>
>
> On 17/02/17 10:37, Aakash Basu wrote:
>
> Hi all,
>
>
> Without using case class I tried making a DF to work on the join and other
> filtration later. But I'm getting an ArrayIndexOutOfBoundException error
> while doing a show of the DF.
>
>
> 1)  Importing SQLContext=
>
> import org.apache.spark.sql.SQLContext._
>
> import org.apache.spark.sql.SQLContext
>
>
>
> 2)  Initializing SQLContext=
>
> val sqlContext = new SQLContext(sc)
>
>
>
> 3)  Importing implicits package for toDF conversion=
>
> import sqlContext.implicits._
>
>
>
> 4)  Reading the Station and Storm Files=
>
> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>
> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>
>
>
>
>
> stat.foreach(println)
>
>
> uihgf   Paris   56   5
>
> asfsds   ***   43   1
>
> fkwsdf   London   45   6
>
> gddg   ABCD   32   2
>
> grgzg   *CSD   35   3
>
> gsrsn   ADR*   22   4
>
>
> 5) Creating row by segregating columns after reading the tab delimited
> file before converting into DF=
>
>
> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2),x.split("\t")(3)))*
>
>
>
> 6)  Converting into DF=
>
> val station = stati.toDF()
>
> *station.show* is giving the below error ->
>
> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
> 15)
> java.lang.ArrayIndexOutOfBoundsException: 1
>
>
> Please help!
>
> Thanks,
> Aakash.
>
>
>
> --
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes 

Graphx Examples for ALS

2017-02-17 Thread balaji9058
Hi,

Where can i find the the ALS recommendation algorithm for large data set?

Please feel to share your ideas/algorithms/logic to build recommendation
engine by using spark graphx

Thanks in advance.

Thanks,
Balaji



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Examples-for-ALS-tp28401.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to convert RDD to DF for this case -

2017-02-17 Thread Christophe Préaud
Hi Aakash,

You can try this:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}

val header = Array("col1", "col2", "col3", "col4")
val schema = StructType(header.map(StructField(_, StringType, true)))

val statRow = stat.map(line => Row(line.split("\t"):_*))
val df = spark.createDataFrame(statRow, schema)

df.show
+--+--+++
|  col1|  col2|col3|col4|
+--+--+++
| uihgf| Paris|  56|   5|
|asfsds|   ***|  43|   1|
|fkwsdf|London|  45|   6|
|  gddg|  ABCD|  32|   2|
| grgzg|  *CSD|  35|   3|
| gsrsn|  ADR*|  22|   4|
+--+--+++


Please let me know if this works for you.

Regards,
Christophe.

On 17/02/17 10:37, Aakash Basu wrote:
Hi all,

Without using case class I tried making a DF to work on the join and other 
filtration later. But I'm getting an ArrayIndexOutOfBoundException error while 
doing a show of the DF.

1)  Importing SQLContext=
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.SQLContext

2)  Initializing SQLContext=
val sqlContext = new SQLContext(sc)

3)  Importing implicits package for toDF conversion=
import sqlContext.implicits._

4)  Reading the Station and Storm Files=
val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")



stat.foreach(println)

uihgf   Paris   56   5
asfsds   ***   43   1
fkwsdf   London   45   6
gddg   ABCD   32   2
grgzg   *CSD   35   3
gsrsn   ADR*   22   4


5) Creating row by segregating columns after reading the tab delimited file 
before converting into DF=

val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1), 
x.split("\t")(2),x.split("\t")(3)))


6)  Converting into DF=
val station = stati.toDF()

station.show is giving the below error ->

17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 15)
java.lang.ArrayIndexOutOfBoundsException: 1


Please help!

Thanks,
Aakash.



Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


how to give hdfs file path as argument to spark-submit

2017-02-17 Thread nancy henry
Hi All,


object Step1 {
  def main(args: Array[String]) = {

val sparkConf = new SparkConf().setAppName("my-app")
val sc = new SparkContext(sparkConf)

val hiveSqlContext: HiveContext = new
org.apache.spark.sql.hive.HiveContext(sc)

hiveSqlContext.sql(scala.io.Source.fromFile(args(0)).mkString)

System.out.println("Okay")

  }

}



This is my spark program and my hivescript is at args(0)

$SPARK_HOME/bin/./spark-submit --class com.spark.test.Step1 --master yarn
--deploy-mode cluster com.spark.test-0.1-SNAPSHOT.jar
 hdfs://spirui-d86-f03-06:9229/samples/testsubquery.hql

but file not found exception is coming

why?

where it is expecting the file to be ?
in local or hdfs?
if in hdfs how i should give its path

and is there any better way for hive context than using this to read query
from a  file from hdfs?