Best Practices for Spark Join

2016-06-01 Thread Aakash Basu
Hi,

Can you please write in order of importance, one by one, the Best Practices
(necessary/better to follow) for doing a Spark Join.

Thanks,
Aakash.


Python to Scala

2016-06-17 Thread Aakash Basu
Hi all,

I've a python code, which I want to convert to Scala for using it in a
Spark program. I'm not so well acquainted with python and learning scala
now. Any Python+Scala expert here? Can someone help me out in this please?

Thanks & Regards,
Aakash.


Re: Python to Scala

2016-06-17 Thread Aakash Basu
Hey,

Our complete project is in Spark on Scala, I code in Scala for Spark,
though am new, but I know it and still learning. But I need help in
converting this code to Scala. I've nearly no knowledge in Python, hence,
requested the experts here.

Hope you get me now.

Thanks,
Aakash.
On 18-Jun-2016 10:07 AM, "Yash Sharma" <yash...@gmail.com> wrote:

> You could use pyspark to run the python code on spark directly. That will
> cut the effort of learning scala.
>
> https://spark.apache.org/docs/0.9.0/python-programming-guide.html
>
> - Thanks, via mobile,  excuse brevity.
> On Jun 18, 2016 2:34 PM, "Aakash Basu" <raj2coo...@gmail.com> wrote:
>
>> Hi all,
>>
>> I've a python code, which I want to convert to Scala for using it in a
>> Spark program. I'm not so well acquainted with python and learning scala
>> now. Any Python+Scala expert here? Can someone help me out in this please?
>>
>> Thanks & Regards,
>> Aakash.
>>
>


Re: Python to Scala

2016-06-17 Thread Aakash Basu
I don't have a sound knowledge in Python and on the other hand we are
working on Spark on Scala, so I don't think it will be allowed to run
PySpark along with it, so the requirement is to convert the code to scala
and use it. But I'm finding it difficult.

Did not find a better forum for help than ours. Hence this mail.
On 18-Jun-2016 10:39 AM, "Stephen Boesch" <java...@gmail.com> wrote:

> What are you expecting us to do?  Yash provided a reasonable approach -
> based on the info you had provided in prior emails.  Otherwise you can
> convert it from python to spark - or find someone else who feels
> comfortable to do it.  That kind of inquiry would likelybe appropriate on a
> job board.
>
>
>
> 2016-06-17 21:47 GMT-07:00 Aakash Basu <raj2coo...@gmail.com>:
>
>> Hey,
>>
>> Our complete project is in Spark on Scala, I code in Scala for Spark,
>> though am new, but I know it and still learning. But I need help in
>> converting this code to Scala. I've nearly no knowledge in Python, hence,
>> requested the experts here.
>>
>> Hope you get me now.
>>
>> Thanks,
>> Aakash.
>> On 18-Jun-2016 10:07 AM, "Yash Sharma" <yash...@gmail.com> wrote:
>>
>>> You could use pyspark to run the python code on spark directly. That
>>> will cut the effort of learning scala.
>>>
>>> https://spark.apache.org/docs/0.9.0/python-programming-guide.html
>>>
>>> - Thanks, via mobile,  excuse brevity.
>>> On Jun 18, 2016 2:34 PM, "Aakash Basu" <raj2coo...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've a python code, which I want to convert to Scala for using it in a
>>>> Spark program. I'm not so well acquainted with python and learning scala
>>>> now. Any Python+Scala expert here? Can someone help me out in this please?
>>>>
>>>> Thanks & Regards,
>>>> Aakash.
>>>>
>>>
>


Re: spark job automatically killed without rhyme or reason

2016-06-23 Thread Aakash Basu
Hey,

I've come across this. There's a command called "yarn application -kill
", which kills the application with a one liner 'Killed'.

If it is memory issue, the error shows up in form of 'GC Overhead' or
forming up tree or something of the sort.

So, I think someone killed your job by that command I gave. To the person
who's running, in the log, it will just give that one word, 'Killed' in the
end.

Maybe this is what you faced. Maybe!

Thanks,
Aakash.
On 23-Jun-2016 11:52 AM, "Zhiliang Zhu"  wrote:

> Thanks a lot for all  the comments, and the useful  information .
>
> Yes, I have much experience to write and run spark jobs, something
> unstable will be there while it run on more data or more time.
> Sometimes it would be not okay while reset some parameter in command line,
> but will be okay while removing it by using default setting. Sometimes it
> is opposite, proper parameter setting needs to be set.
>
> Here is installing spark 1.5 by other person.
>
>
>
>
> On Wednesday, June 22, 2016 1:59 PM, Nirav Patel 
> wrote:
>
>
> spark is memory hogger and suicidal if you have a job processing bigger
> dataset. however databricks claims that  spark > 1.6  have optimization
> related to memory footprint as well as processing. It will only be
> available if you use dataframe or dataset. if you are using rdd you have to
> do lot of testing and tuning.
>
> On Mon, Jun 20, 2016 at 1:34 AM, Sean Owen  wrote:
>
> I'm not sure that's the conclusion. It's not trivial to tune and
> configure YARN and Spark to match your app's memory needs and profile,
> but, it's also just a matter of setting them properly. I'm not clear
> you've set the executor memory for example, in particular
> spark.yarn.executor.memoryOverhead
>
> Everything else you mention is a symptom of YARN shutting down your
> jobs because your memory settings don't match what your app does.
> They're not problems per se, based on what you have provided.
>
>
> On Mon, Jun 20, 2016 at 9:17 AM, Zhiliang Zhu
>  wrote:
> > Hi Alexander ,
> >
> > Thanks a lot for your comments.
> >
> > Spark seems not that stable when it comes to run big job, too much data
> or
> > too much time, yes, the problem is gone when reducing the scale.
> > Sometimes reset some job running parameter (such as --drive-memory may
> help
> > in GC issue) , sometimes may rewrite the codes by applying other
> algorithm.
> >
> > As you commented the shuffle operation, it sounds some as the reason ...
> >
> > Best Wishes !
> >
> >
> >
> > On Friday, June 17, 2016 8:45 PM, Alexander Kapustin 
> > wrote:
> >
> >
> > Hi Zhiliang,
> >
> > Yes, find the exact reason of failure is very difficult. We have issue
> with
> > similar behavior, due to limited time for investigation, we reduce the
> > number of processed data, and problem has gone.
> >
> > Some points which may help you in investigations:
> > · If you start spark-history-server (or monitoring running
> > application on 4040 port), look into failed stages (if any). By default
> > Spark try to retry stage execution 2 times, after that job fails
> > · Some useful information may contains in yarn logs on Hadoop
> nodes
> > (yarn--nodemanager-.log), but this is only information about
> > killed container, not about the reasons why this stage took so much
> memory
> >
> > As I can see in your logs, failed step relates to shuffle operation,
> could
> > you change your job to avoid massive shuffle operation?
> >
> > --
> > WBR, Alexander
> >
> > From: Zhiliang Zhu
> > Sent: 17 июня 2016 г. 14:10
> > To: User; kp...@hotmail.com
> > Subject: Re: spark job automatically killed without rhyme or reason
> >
> >
> > Show original message
> >
> >
> > Hi Alexander,
> >
> > is your yarn userlog   just for the executor log ?
> >
> > as for those logs seem a little difficult to exactly decide the wrong
> point,
> > due to sometimes successful job may also have some kinds of the error
> ...
> > but will repair itself.
> > spark seems not that stable currently ...
> >
> > Thank you in advance~
> >
> >
> >
> > On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu 
> wrote:
> >
> >
> > Hi Alexander,
> >
> > Thanks a lot for your reply.
> >
> > Yes, submitted by yarn.
> > Do you just mean in the executor log file by way of yarn logs
> -applicationId
> > id,
> >
> > in this file, both in some containers' stdout  and stderr :
> >
> > 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive
> > connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one.
> > 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while
> > beginning fetch of 1 outstanding blocks
> > java.io.IOException: Failed to connect to
> > ip-172-31-20-104/172.31.20.104:49991  <-- may it be due
> to
> > that spark is not stable, and spark may repair itself for these kinds of
> > error ? (saw 

Spark JOIN Not working

2016-05-24 Thread Aakash Basu
Hi experts,

I'm extremely new to the Spark Ecosystem, hence need a help from you guys.
While trying to fetch data from CSV files and join querying them in
accordance to the need, when I'm caching the data by using
registeredTempTables and then using select query to select what I need as
per the given condition, I'm getting the data. BUT when I'm trying to do
the same query using JOIN, I'm getting zero results.

Both the codes attached are same, except a few differences, like the
Running_Code.scala uses the Select Query and the
ProductHierarchy_Dimension.scala uses the JOIN queries.

Please help me out in this. Stuck for two long days.

Thanks,
Aakash.


ProductHierarchy_Dimension.scala
Description: Binary data


Running_Code.scala
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Pros and Cons

2016-05-25 Thread Aakash Basu
Hi,



I’m new to the Spark Ecosystem, need to understand the *Pros and Cons *of
fetching data using *SparkSQL vs Hive in Spark vs Spark API.*



*PLEASE HELP!*



Thanks,

Aakash Basu.


Unsubscribe

2016-08-09 Thread Aakash Basu



Re: Little idea needed

2016-07-20 Thread Aakash Basu
Thanks for the detailed description buddy. But this will actually be done
through NiFi (End to End) so we need to add the delta logic inside NiFi to
automate the whole process.

That's why, need a good (best) solution to solve this problem. Since, this
is a classic issue which we can face any company we work with.
On 20-Jul-2016 1:38 AM, "Mich Talebzadeh" <mich.talebza...@gmail.com> wrote:

> Well this is a classic.
>
> The initial load can be done through Sqoop (outside of Spark) or through
> JDBC connection in Spark. 10 million rows in nothing.
>
> Then you have to think of updates and deletes in addition to new rows.
>
> With Sqoop you can load from the last ID in the source table, assuming
> that you have a unique key in Your Oracle table.
>
> If you have 10 new roes and I assume you know how to load these rows from
> Oracle.
>
> I suggest that you add two additional columns to your HDFS/target table,
>
> ,op_type int
> ,op_time timestamp
>
> These two columns will specify the row type op_type = 1,2,3
> INSERT/UPDATE/DELETE and op_time = cast(from_unixtime(unix_timestamp())
> AS op_time) when the record was added.
>
> So you will end up with two additional columns in your HDFS table compared
> to Oracle table and that will be your staging table.
>
> Of course you can do real time analytics through Oracle GoldenGate that
> read the redolog of the source table in Oracle or better Sap Replication
> Server (SRS). You will achieve real-time integration between RDBMS tables
> and Big Data.
>
> Once you have you have the staging table (immutable) and the rest is
> pretty easy. You have the full Entity Life History in this case for records
> and you can do your queries on them.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 July 2016 at 20:27, Aakash Basu <raj2coo...@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm trying to pull a full table from oracle, which is huge with some 10
>> million records which will be the initial load to HDFS.
>>
>> Then I will do delta loads everyday in the same folder in HDFS.
>>
>> Now, my query here is,
>>
>> DAY 0 - I did the initial load (full dump).
>>
>> DAY 1 - I'll load only that day's data which has suppose 10 records (5
>> old with some column's value altered and 5 new).
>>
>> Here, my question is, how will I push this file to HDFS through Spark
>> code, if I do append, it will create duplicates (which i don't want), if i
>> keep separate files and while using it in other program am giving the path
>> of it as folder which contains all files /. But in this case also the
>> registerTempTable will have duplicates for those 5 old rows.
>>
>> What is the BEST logic to be applied here?
>>
>> I tried to resolve this by doing a search in that file of the records if
>> matching load the new ones by deleting the old, but this will be time
>> consuming for such a huge record, right?
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>
>


Re: Little idea needed

2016-07-20 Thread Aakash Basu
Your second point: That's going to be a bottleneck for all the programs
which will fetch the data from that folder and again add extra filters into
the DF. I want to finish that off, there itself.

And that merge logic is weak when one table is huge and the other is very
small (which is the case here), it literally gulps memory and time.

And business won't allow Hive and all else to be used AT ALL, since we may
shift to EMR where Hive has compatibility issues maybe (need to check).
On 20-Jul-2016 1:27 AM, "Jörn Franke" <jornfra...@gmail.com> wrote:

Well as far as I know there is some update statement planned for spark, but
not sure which release. You could alternatively use Hive+Orc.
Another alternative would be to add the deltas in a separate file and when
accessing the table filtering out the double entries. From time to time you
could have a merge process creating one file out of all the deltas.

On 19 Jul 2016, at 21:27, Aakash Basu <raj2coo...@gmail.com> wrote:

Hi all,

I'm trying to pull a full table from oracle, which is huge with some 10
million records which will be the initial load to HDFS.

Then I will do delta loads everyday in the same folder in HDFS.

Now, my query here is,

DAY 0 - I did the initial load (full dump).

DAY 1 - I'll load only that day's data which has suppose 10 records (5 old
with some column's value altered and 5 new).

Here, my question is, how will I push this file to HDFS through Spark code,
if I do append, it will create duplicates (which i don't want), if i keep
separate files and while using it in other program am giving the path of it
as folder which contains all files /. But in this case also the
registerTempTable will have duplicates for those 5 old rows.

What is the BEST logic to be applied here?

I tried to resolve this by doing a search in that file of the records if
matching load the new ones by deleting the old, but this will be time
consuming for such a huge record, right?

Please help!

Thanks,
Aakash.


Little idea needed

2016-07-19 Thread Aakash Basu
Hi all,

I'm trying to pull a full table from oracle, which is huge with some 10
million records which will be the initial load to HDFS.

Then I will do delta loads everyday in the same folder in HDFS.

Now, my query here is,

DAY 0 - I did the initial load (full dump).

DAY 1 - I'll load only that day's data which has suppose 10 records (5 old
with some column's value altered and 5 new).

Here, my question is, how will I push this file to HDFS through Spark code,
if I do append, it will create duplicates (which i don't want), if i keep
separate files and while using it in other program am giving the path of it
as folder which contains all files /. But in this case also the
registerTempTable will have duplicates for those 5 old rows.

What is the BEST logic to be applied here?

I tried to resolve this by doing a search in that file of the records if
matching load the new ones by deleting the old, but this will be time
consuming for such a huge record, right?

Please help!

Thanks,
Aakash.


Re: How to convert RDD to DF for this case -

2017-02-17 Thread Aakash Basu
Hey Chris,

Thanks for your quick help. Actually the dataset had issues, otherwise the
logic I implemented was not wrong.

I did this -

1)  *V.Imp *– Creating row by segregating columns after reading the tab
delimited file before converting into DF=

val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2).toInt, x.split("\t")(3).toInt))



Do a take to see if it throws an error or not (this step is just for
ensuring if everything is going fine (as it is a lazy execution, that’s
why)=

stati.take(2)

*Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
(asfsds,***,43,1))

If this comes out, it means it is working fine. We can proceed.

2)  *V.Imp* - Now converting into DF=

val station =
stati.toDF("StationKey","StationName","Temparature","StationID")



Now doing a show to see how it looks like=

station.show

*Ans:*

* +--+---+---+-+*

*|StationKey|StationName|Temparature|StationID|*

*+--+---+---+-+*

*| uihgf|   Pune| 56|5|*

*|asfsds|***| 43|1|*

*|fkwsdf| Mumbai| 45|6|*

*|  gddg|   ABCD| 32|2|*

*| grgzg| *CSD**| 35|3|*

*| gsrsn| Howrah| 22|4|*

*| ffafv|***| 34|7|*

*+--+---+---+-+*



3)  Do the same for the other dataset -

i) val storr = stor.map(p => (p.split("\t")(0).toInt,
p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))

ii)storr.take(2)

iii)   val storm = storr.toDF("ID","Name","Temp","Code")

iv)   storm.show





4)  Registering as table=

 val stations2 = station.registerTempTable("Stations")

val storms2 = storm.registerTempTable("Storms")



5)  Querying on the joinedDF as per requirements=

val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
Stations.StationID as StationID from Stations inner join Storms on
Storms.Code = Stations.StationKey where Stations.Temparature > 35")



6)  joinedDF.show

+---+-+

|StationName|StationID|

+---+-+

|   Pune|5|

+---+-+

7)  Saving the file as CSV=

joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile("/user/root/spark_demo/scala/data/output/Question6Soln")



Thanks,

Aakash.

On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
christophe.pre...@kelkoo.com> wrote:

> Hi Aakash,
>
> You can try this:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>
> val header = Array("col1", "col2", "col3", "col4")
> val schema = StructType(header.map(StructField(_, StringType, true)))
>
> val statRow = stat.map(line => Row(line.split("\t"):_*))
> val df = spark.createDataFrame(statRow, schema)
>
> df.show
> +--+--+++
> |  col1|  col2|col3|col4|
> +--+--+++
> | uihgf| Paris|  56|   5|
> |asfsds|   ***|  43|   1|
> |fkwsdf|London|  45|   6|
> |  gddg|  ABCD|  32|   2|
> | grgzg|  *CSD|  35|   3|
> | gsrsn|  ADR*|  22|   4|
> +--+--+++
>
> Please let me know if this works for you.
>
> Regards,
> Christophe.
>
>
> On 17/02/17 10:37, Aakash Basu wrote:
>
> Hi all,
>
>
> Without using case class I tried making a DF to work on the join and other
> filtration later. But I'm getting an ArrayIndexOutOfBoundException error
> while doing a show of the DF.
>
>
> 1)  Importing SQLContext=
>
> import org.apache.spark.sql.SQLContext._
>
> import org.apache.spark.sql.SQLContext
>
>
>
> 2)  Initializing SQLContext=
>
> val sqlContext = new SQLContext(sc)
>
>
>
> 3)  Importing implicits package for toDF conversion=
>
> import sqlContext.implicits._
>
>
>
> 4)  Reading the Station and Storm Files=
>
> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>
> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>
>
>
>
>
> stat.foreach(println)
>
>
> uihgf   Paris   56   5
>
> asfsds   ***   43   1
>
> fkwsdf   London   45   6
>
> gddg   ABCD   32   2
>
> grgzg   *CSD   35   3
>
> gsrsn   ADR*   22   4
>
>
> 5) Creating row by segregating columns after reading the tab delimited
> file before converting into DF=
>
>
> *val stati = stat.map(x => (x.split("

Re: Get S3 Parquet File

2017-02-23 Thread Aakash Basu
Hey,

Please recheck your access key and secret key being used to fetch the
parquet file. It seems to be a credential error. Either mismatch/load. If
load, then first use it directly in code and see if the issue resolves,
then it can be hidden and read from Input Params.

Thanks,
Aakash.


On 23-Feb-2017 11:54 PM, "Benjamin Kim"  wrote:

We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet
file from AWS S3. We can read the schema and show some data when the file
is loaded into a DataFrame, but when we try to do some operations, such as
count, we get this error below.

com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS
credentials from any provider in the chain
at com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.
getCredentials(AWSCredentialsProviderChain.java:117)
at com.cloudera.com.amazonaws.services.s3.AmazonS3Client.
invoke(AmazonS3Client.java:3779)
at com.cloudera.com.amazonaws.services.s3.AmazonS3Client.
headBucket(AmazonS3Client.java:1107)
at com.cloudera.com.amazonaws.services.s3.AmazonS3Client.
doesBucketExist(AmazonS3Client.java:1070)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(
S3AFileSystem.java:239)
at org.apache.hadoop.fs.FileSystem.createFileSystem(
FileSystem.java:2711)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
FileSystem.java:2748)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at parquet.hadoop.ParquetFileReader.readFooter(
ParquetFileReader.java:385)
at parquet.hadoop.ParquetRecordReader.initializeInternalReader(
ParquetRecordReader.java:162)
at parquet.hadoop.ParquetRecordReader.initialize(
ParquetRecordReader.java:145)
at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.
(SqlNewHadoopRDD.scala:180)
at org.apache.spark.rdd.SqlNewHadoopRDD.compute(
SqlNewHadoopRDD.scala:126)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:229)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Can anyone help?

Cheers,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Fwd: Need some help

2016-09-01 Thread Aakash Basu
-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Thu, Aug 25, 2016 at 10:06 PM
Subject: Need some help
To: user@spark.apache.org


Hi all,

Aakash here, need a little help in KMeans clustering.

This is needed to be done:

"Implement Kmeans Clustering Algorithm without using the libraries of
Spark. You're given a txt file with object ids and features from which you
have to use the features as your data points. This will be a part of the
code itself"

PFA the file with ObjectIDs and features. Now how to go ahead and work on
it?

Thanks,
Aakash.
clueweb12-tw-00-034280.819039 -0.40844217 0.1208266 0.082789585 
-0.2421226 -0.1707348 -0.38008857 0.1938118 -0.217733 -0.11316321 0.22536139 
0.4077712 0.5106064 0.0691058 0.10968939 -0.2776644 -0.5323738 -0.117045596 
0.23160939 0.0968846 -6.479684 0.280832 0.1053532 0.258626 -0.1394934 
-0.04401499 -0.06274801 0.2977866 0.23100719 -0.1442094 -0.1190624 -0.018465001 
-0.5228338 -0.090049796 0.23440179 0.4241498 -0.41945544 -0.37678298 
-0.085718594 0.0114066005 -0.11727621 -0.283434 0.368738 0.2701438 -0.2666412 
-0.1634044 0.2432622 0.49877137 0.3270268 -0.7572574
clueweb12-tw-00-03680-0.063763246 0.060122482 0.25039256 
-0.17695262 -0.024269182 -0.060460586 -0.020093922 -0.28145245 -0.119478844 
-0.22801346 -0.0019172033 0.10361874 -0.22672825 -0.17311707 0.18358645 
0.07715805 -0.14939435 -0.19412045 -0.034667462 -0.044996627 -5.5738134 
0.11706767 0.1936782 -0.027793365 -0.22054577 0.16990958 -0.03664338 0.3563341 
0.030425504 0.15397832 0.015848804 0.18880104 0.15031552 0.0662723 0.06305552 
0.017769573 0.099713035 -0.05385251 0.086493894 0.055057835 0.106260784 
-0.066389546 -0.13271035 -0.11731695 -0.12733212 -0.16161665 -0.13481794 
0.14648221 0.041699838 -0.06707647
clueweb12-tw-00-04733-0.30487683 0.37906706 0.092391066 -0.12356548 
0.041434832 0.053371474 -0.061796933 -0.34376934 -0.15945148 0.121789776 
0.05491904 0.07184038 -0.13218853 -0.26488 0.09069567 0.18619555 -0.20166355 
-0.42629552 0.04779238 0.07399226 -6.007872 -0.10489178 0.058998298 0.031324565 
-0.045885365 -0.3257782 -0.058766462 0.04142299 -0.024721975 0.15923695 
0.01233 -0.030803397 0.19786847 0.21469156 -0.16236338 -0.13572672 
0.1979717 0.010117755 0.21812446 9.308494E-4 0.11536124 -0.044362586 -0.2429856 
-0.1789137 0.074494615 0.0022599115 -0.06896331 0.060051132 -0.16935208 
0.05135853
clueweb12-tw-00-05462-0.1689229 0.044400293 0.074416816 0.16745372 
-0.047404937 -0.07548128 -0.16308217 -0.04896295 0.09722823 -0.06403786 
0.04868864 0.012745747 0.01701884 -0.20373678 0.14389461 0.012322425 -0.1292581 
-0.08012425 0.12841988 -0.033620425 -5.7025776 0.054090414 0.14100702 0.0735518 
-0.055296857 0.121764086 -0.01585382 -0.19469371 0.056806263 0.16898213 
-0.13701764 -0.06280311 0.119968586 -0.0025512849 6.280605E-4 0.12848213 
0.10212754 -0.023070885 0.13707727 -0.13853486 0.21509309 -0.016114214 
0.10025307 0.041132428 -0.11974216 -0.12352202 0.1947182 0.13712671 -0.11699053 
0.16696283
clueweb12-tw-00-061910.05369992 -0.08874621 0.22850059 -0.1836124 
-0.117735796 -0.27074137 -0.047539733 0.042012293 0.09973079 0.031871755 
0.0653635 0.052989103 -0.121807896 -4.6803567E-4 0.2528799 -0.096173055 
-0.07769931 -0.06987546 0.14199859 -0.17673229 -5.7380853 -0.028545447 
0.3338006 0.13075967 0.13761607 -0.034920916 -0.060133602 0.22424728 
-0.39989826 0.057518493 -0.04785612 0.09987477 0.26938933 0.016046084 
-0.15992445 -0.18638565 0.05115415 -0.16499878 0.0066496585 -0.042277105 
0.14138252 0.06549572 0.015083913 -0.16352524 0.09245014 -0.04816438 0.17806058 
0.16417544 -0.16822924 -0.074308924

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Fwd: Need some help

2016-09-01 Thread Aakash Basu
Hey Siva,

It needs to be done with Spark, without the use of any Spark libraries.
Need some help in this.

Thanks,
Aakash.

On Fri, Sep 2, 2016 at 1:25 AM, Sivakumaran S <siva.kuma...@icloud.com>
wrote:

> If you are to do it without Spark, you are asking at the wrong place. Try
> Python + scikit-learn. Or R. If you want to do it with a UI based software,
> try Weka or Orange.
>
> Regards,
>
> Sivakumaran S
>
> On 1 Sep 2016 8:42 p.m., Aakash Basu <aakash.spark@gmail.com> wrote:
>
>
> ------ Forwarded message --
> From: *Aakash Basu* <aakash.spark@gmail.com>
> Date: Thu, Aug 25, 2016 at 10:06 PM
> Subject: Need some help
> To: user@spark.apache.org
>
>
> Hi all,
>
> Aakash here, need a little help in KMeans clustering.
>
> This is needed to be done:
>
> "Implement Kmeans Clustering Algorithm without using the libraries of
> Spark. You're given a txt file with object ids and features from which you
> have to use the features as your data points. This will be a part of the
> code itself"
>
> PFA the file with ObjectIDs and features. Now how to go ahead and work on
> it?
>
> Thanks,
> Aakash.
>
>
>


Re: Fwd: Need some help

2016-09-02 Thread Aakash Basu
Hi Shashank/All,

Yes you got it right, that's what I need to do. Can I get some help in
this? I've no clue what it is and how to work on it.

Thanks,
Aakash.

On Fri, Sep 2, 2016 at 1:48 AM, Shashank Mandil <mandil.shash...@gmail.com>
wrote:

> Hi Aakash,
>
> I think what it generally means that you have to use the general spark
> APIs of Dataframe to bring in the data and crunch the numbers, however you
> cannot use the KMeansClustering algorithm which is already present in the
> MLlib spark library.
>
> I think a good place to start would be understanding what the KMeans
> clustering algorithm is and then looking into how you can use the DataFrame
> API to implement the KMeansClustering.
>
> Thanks,
> Shashank
>
> On Thu, Sep 1, 2016 at 1:05 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hey Siva,
>>
>> It needs to be done with Spark, without the use of any Spark libraries.
>> Need some help in this.
>>
>> Thanks,
>> Aakash.
>>
>> On Fri, Sep 2, 2016 at 1:25 AM, Sivakumaran S <siva.kuma...@icloud.com>
>> wrote:
>>
>>> If you are to do it without Spark, you are asking at the wrong place.
>>> Try Python + scikit-learn. Or R. If you want to do it with a UI based
>>> software, try Weka or Orange.
>>>
>>> Regards,
>>>
>>> Sivakumaran S
>>>
>>> On 1 Sep 2016 8:42 p.m., Aakash Basu <aakash.spark@gmail.com> wrote:
>>>
>>>
>>> -- Forwarded message --
>>> From: *Aakash Basu* <aakash.spark@gmail.com>
>>> Date: Thu, Aug 25, 2016 at 10:06 PM
>>> Subject: Need some help
>>> To: user@spark.apache.org
>>>
>>>
>>> Hi all,
>>>
>>> Aakash here, need a little help in KMeans clustering.
>>>
>>> This is needed to be done:
>>>
>>> "Implement Kmeans Clustering Algorithm without using the libraries of
>>> Spark. You're given a txt file with object ids and features from which you
>>> have to use the features as your data points. This will be a part of the
>>> code itself"
>>>
>>> PFA the file with ObjectIDs and features. Now how to go ahead and work
>>> on it?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>>
>>>
>>
>


Join Query

2016-11-17 Thread Aakash Basu
Hi,




Conceptually I can understand below spark joins, when it comes to
implementation I don’t find much information in Google. Please help me with
code/pseudo code for below joins using java-spark or scala-spark.



*Replication Join:*

Given two datasets, where one is small enough to fit into
the memory, perform a Replicated join using Spark.

Note: Need a program to justify this fits for Replication Join.



*Semi-Join:*

Given a huge dataset, do a semi-join using spark. Note
that, with semi-join, one dataset needs to do Filter and projection to fit
into the cache.

Note: Need a program to justify this fits for Semi-Join.





*Composite Join:*

Given a dataset whereby a dataset is still too big after
filtering and cannot fit into the memory. Perform composite join on a
pre-sorted and pre-partitioned data using spark.

Note: Need a program to justify this fits for composite Join.





*Repartition join:*

Join two datasets by performing Repartition join in spark.

Note: Need a program to justify this fits for repartition Join.






Thanks,

Aakash.


HDPCD SPARK Certification Queries

2016-11-17 Thread Aakash Basu
Hi all,


I want to know more about this examination -
http://hortonworks.com/training/certification/exam-objectives/#hdpcdspark


If anyone's there who appeared for the examination, can you kindly help?

1) What are the kind of questions that come,

2) Samples,

3) All the other details.

Thanks,
Aakash.


Hortonworks Spark Certification Query

2016-12-14 Thread Aakash Basu
Hi all,

Is there anyone who wrote the HDPCD examination as in the below link?

http://hortonworks.com/training/certification/exam-objectives/#hdpcdspark

I'm going to sit for this with a very little time to prepare, can I please
be helped with the questions to expect and their probable solutions?

This is my first certification examination in life. So, a kind help shall
be highly acknowledged.

Thanks,
Aakash.


Re: community feedback on RedShift with Spark

2017-04-24 Thread Aakash Basu
Hey afshin,

Your point 1 is innumerably faster than the latter.

It further shoots up the speed if you know how to properly use distKey and
sortKey on the tables being loaded.

Thanks,
Aakash.
https://www.linkedin.com/in/aakash-basu-5278b363


On 24-Apr-2017 10:37 PM, "Afshin, Bardia" <bardia.afs...@capitalone.com>
wrote:

I wanted to reach out to the community to get a understanding of what
everyones experience is in regardst to maximizing performance as in
decreasing load time on loading multiple large datasets to RedShift.



Two approaches:

1.   Spark writes file to S3, RedShift COPY INTO from S3 bucket.

2.   Spark directly writes results to RedShfit via JDBC



JDBC is known for poor performance, and RedShift (wihtout any provided
examples) claims you can speed up loading from s3 buckets via different
queues set up in your RedShift Workload Management.



What’s the communities experience with desiging processes which large
datasets are needed to be pushed into RedShfit and doing it in minimal time
taken to load the data to RedShift?

--

The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information
transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission,
dissemination, distribution, copying or other use of, or taking of any
action in reliance upon this information is strictly prohibited. If you
have received this communication in error, please contact the sender and
delete the material from your computer.


Reading Excel (.xlsm) file through PySpark 2.1.1 with external JAR is causing fatal conversion of data type

2017-08-16 Thread Aakash Basu
Hi all,

I am working on PySpark (*Python 3.6 and Spark 2.1.1*) and trying to fetch
data from an excel file using
*spark.read.format("com.crealytics.spark.excel")*, but it is inferring
double for a date type column.

The detailed description is given here (the question I posted) -

https://stackoverflow.com/questions/45713699/inferschema-using-spark-read-formatcom-crealytics-spark-excel-is-inferring-d


Found it is a probable bug with the crealytics excel read package.

Can somebody help me with a workaround for this?

Thanks,
Aakash.


Re: Reading Excel (.xlsm) file through PySpark 2.1.1 with external JAR is causing fatal conversion of data type

2017-08-16 Thread Aakash Basu
Hey all,

Forgot to attach the link to the overriding Schema through external
package's discussion.

https://github.com/crealytics/spark-excel/pull/13

You can see my comment there too.

Thanks,
Aakash.

On Wed, Aug 16, 2017 at 11:11 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hi all,
>
> I am working on PySpark (*Python 3.6 and Spark 2.1.1*) and trying to
> fetch data from an excel file using
> *spark.read.format("com.crealytics.spark.excel")*, but it is inferring
> double for a date type column.
>
> The detailed description is given here (the question I posted) -
>
> https://stackoverflow.com/questions/45713699/inferschema-using-spark-read-
> formatcom-crealytics-spark-excel-is-inferring-d
>
>
> Found it is a probable bug with the crealytics excel read package.
>
> Can somebody help me with a workaround for this?
>
> Thanks,
> Aakash.
>


Re: Reading Excel (.xlsm) file through PySpark 2.1.1 with external JAR is causing fatal conversion of data type

2017-08-16 Thread Aakash Basu
Hey Irving,

Thanks for a quick revert. In Excel that column is purely string, I
actually want to import that as a String and later play around the DF to
convert it back to date type, but the API itself is not allowing me to
dynamically assign a Schema to the DF and I'm forced to inferSchema, where
itself, it is converting all numeric columns to double (Though, I don't
know how then the date column is getting converted to double if it is
string in the Excel source).

Thanks,
Aakash.


On 16-Aug-2017 11:39 PM, "Irving Duran" <irving.du...@gmail.com> wrote:

I think there is a difference between the actual value in the cell and what
Excel formats that cell.  You probably want to import that field as a
string or not have it as a date format in Excel.

Just a thought


Thank You,

Irving Duran

On Wed, Aug 16, 2017 at 12:47 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hey all,
>
> Forgot to attach the link to the overriding Schema through external
> package's discussion.
>
> https://github.com/crealytics/spark-excel/pull/13
>
> You can see my comment there too.
>
> Thanks,
> Aakash.
>
> On Wed, Aug 16, 2017 at 11:11 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am working on PySpark (*Python 3.6 and Spark 2.1.1*) and trying to
>> fetch data from an excel file using
>> *spark.read.format("com.crealytics.spark.excel")*, but it is inferring
>> double for a date type column.
>>
>> The detailed description is given here (the question I posted) -
>>
>> https://stackoverflow.com/questions/45713699/inferschema-usi
>> ng-spark-read-formatcom-crealytics-spark-excel-is-inferring-d
>>
>>
>> Found it is a probable bug with the crealytics excel read package.
>>
>> Can somebody help me with a workaround for this?
>>
>> Thanks,
>> Aakash.
>>
>
>


Re: Reading Excel (.xlsm) file through PySpark 2.1.1 with external JAR is causing fatal conversion of data type

2017-08-17 Thread Aakash Basu
Hey all,

Thanks! I had a discussion with the person who authored that package and
informed about this bug, but in the meantime with the same thing, found a
small tweak to ensure the job is done.

Now that is fine, I'm getting the date as a string by predefining the
Schema but I want to later convert it to a datetime format, which is making
it this -

>>> from pyspark.sql.functions import from_unixtime, unix_timestamp
>>> df2 = dflead.select('Enter_Date',
from_unixtime(unix_timestamp('Enter_Date', 'MM/dd/yyy')).alias('date'))


>>> df2.show()

[image: Inline image 1]

Which is not correct (as it is converting the 15 to 0015 instead of 2015.
Do you guys think using the DateUtil package will solve this? Or any other
solution with this built-in package?

Please help!

Thanks,
Aakash.

On Thu, Aug 17, 2017 at 12:01 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> You can use Apache POI DateUtil to convert double to Date (
> https://poi.apache.org/apidocs/org/apache/poi/ss/usermodel/DateUtil.html).
> Alternatively you can try HadoopOffice (https://github.com/ZuInnoTe/
> hadoopoffice/wiki), it supports Spark 1.x or Spark 2.0 ds.
>
> On 16. Aug 2017, at 20:15, Aakash Basu <aakash.spark@gmail.com> wrote:
>
> Hey Irving,
>
> Thanks for a quick revert. In Excel that column is purely string, I
> actually want to import that as a String and later play around the DF to
> convert it back to date type, but the API itself is not allowing me to
> dynamically assign a Schema to the DF and I'm forced to inferSchema, where
> itself, it is converting all numeric columns to double (Though, I don't
> know how then the date column is getting converted to double if it is
> string in the Excel source).
>
> Thanks,
> Aakash.
>
>
> On 16-Aug-2017 11:39 PM, "Irving Duran" <irving.du...@gmail.com> wrote:
>
> I think there is a difference between the actual value in the cell and
> what Excel formats that cell.  You probably want to import that field as a
> string or not have it as a date format in Excel.
>
> Just a thought
>
>
> Thank You,
>
> Irving Duran
>
> On Wed, Aug 16, 2017 at 12:47 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hey all,
>>
>> Forgot to attach the link to the overriding Schema through external
>> package's discussion.
>>
>> https://github.com/crealytics/spark-excel/pull/13
>>
>> You can see my comment there too.
>>
>> Thanks,
>> Aakash.
>>
>> On Wed, Aug 16, 2017 at 11:11 PM, Aakash Basu <aakash.spark@gmail.com
>> > wrote:
>>
>>> Hi all,
>>>
>>> I am working on PySpark (*Python 3.6 and Spark 2.1.1*) and trying to
>>> fetch data from an excel file using
>>> *spark.read.format("com.crealytics.spark.excel")*, but it is inferring
>>> double for a date type column.
>>>
>>> The detailed description is given here (the question I posted) -
>>>
>>> https://stackoverflow.com/questions/45713699/inferschema-usi
>>> ng-spark-read-formatcom-crealytics-spark-excel-is-inferring-d
>>>
>>>
>>> Found it is a probable bug with the crealytics excel read package.
>>>
>>> Can somebody help me with a workaround for this?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>>
>
>


Any solution for this?

2017-05-15 Thread Aakash Basu
Hi all,

Any solution for this issue - http://stackoverfl ow.com/q/43921392/7998705



Thanks,
Aakash.


Re: Use SQL Script to Write Spark SQL Jobs

2017-06-12 Thread Aakash Basu
Hey,

I work on Spark SQL and would pretty much be able to help you in this. Let
me know your requirement.

Thanks,
Aakash.

On 12-Jun-2017 11:00 AM, "bo yang"  wrote:

> Hi Guys,
>
> I am writing a small open source project
>  to use SQL Script to write
> Spark Jobs. Want to see if there are other people interested to use or
> contribute to this project.
>
> The project is called UberScriptQuery (https://github.com/uber/
> uberscriptquery). Sorry for the dumb name to avoid conflict with many
> other names (Spark is registered trademark, thus I could not use Spark in
> my project name).
>
> In short, it is a high level SQL-like DSL (Domain Specific Language) on
> top of Spark. People can use that DSL to write Spark jobs without worrying
> about Spark internal details. Please check README
>  in the project to get more
> details.
>
> It will be great if I could get any feedback or suggestions!
>
> Best,
> Bo
>
>


Re: Spark-SQL collect function

2017-05-19 Thread Aakash Basu
Well described​, thanks!

On 04-May-2017 4:07 AM, "JayeshLalwani" 
wrote:

> In any distributed application, you scale up by splitting execution up on
> multiple machines. The way Spark does this is by slicing the data into
> partitions and spreading them on multiple machines. Logically, an RDD is
> exactly that: data is split up and spread around on multiple machines. When
> you perform operations on an RDD, Spark tells all the machines to perform
> that operation on their own slice of data. SO, for example, if you perform
> a
> filter operation (or if you are using SQL, you do /Select * from tablename
> where col=colval/, Spark tells each machine to look for rows that match
> your
> filter criteria in their own slice of data. This operation results in
> another distributed dataset that contains the filtered records. Note that
> when you do a filter operation, Spark doesn't move data outside of the
> machines that they reside in. It keeps the filtered records in the same
> machine. This ability of Spark to keep data in place is what provides
> scalability. As long as your operations keep data in place, you can scale
> up
> infinitely. If you got 10x more records, you can add 10x more machines, and
> you will get the same performance
>
> However, the problem is that a lot of operations cannot be done by keeping
> data in place. For example, let's say you have 2 tables/dataframes. Spark
> will slice both up and spread them around the machines. Now let's say, you
> joined both tables. It may happen that the slice of data that resides in
> one
> machine has matching records in another machine. So, now, Spark has to
> bring
> data over from one machine to another. This is what Spark calls a
> /shuffle/Spark does this intelligently. However, whenever data leaves one
> machine and goes to other machines, you cannot scale infinitely. There will
> be a point at which you will overwhelm the network, and adding more
> machines
> isn't going to improve performance.
>
> So, the point is that you have to avoid shuffles as much as possible. You
> cannot eliminate shuffles altogether, but you can reduce them
>
> Now, /collect/ is the granddaddy of all shuffles. It causes Spark to bring
> all the data that it has distributedd over the machines into a single
> machine. If you call collect on a large table, it's analogous to drinking
> from a firehose. You are going to drown.Calling collect on a small table is
> fine, because very little data will move
>
> Usually, it's recommended to run all your aggregations using Spark SQL, and
> when you get the data boiled down to a small enough size that can be
> presented to a human, you can call collect on it to fetch it and present it
> to the human user.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-SQL-collect-function-tp28644p28647.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Documentation on "Automatic file coalescing for native data sources"?

2017-05-19 Thread Aakash Basu
Hey all,

A reply on this would be great!

Thanks,
A.B.

On 17-May-2017 1:43 AM, "Daniel Siegmann" 
wrote:

> When using spark.read on a large number of small files, these are
> automatically coalesced into fewer partitions. The only documentation I can
> find on this is in the Spark 2.0.0 release notes, where it simply says (
> http://spark.apache.org/releases/spark-release-2-0-0.html):
>
> "Automatic file coalescing for native data sources"
>
> Can anyone point me to documentation explaining what triggers this
> feature, how it decides how many partitions to coalesce to, and what counts
> as a "native data source"? I couldn't find any mention of this feature in
> the SQL Programming Guide and Google was not helpful.
>
> --
> Daniel Siegmann
> Senior Software Engineer
> *SecurityScorecard Inc.*
> 214 W 29th Street, 5th Floor
> New York, NY 10001
>
>


Repartition vs PartitionBy Help/Understanding needed

2017-06-15 Thread Aakash Basu
Hi all,

Everybody is giving a difference between coalesce and repartition, but
nowhere I found a difference between partitionBy and repartition. My
question is, is it better to write a data set in parquet partitioning by a
column and then reading the respective directories to work on that column
in accordance and relevance or using repartition on that column to do the
same in memory?


A) One scenario is -

*val partitioned_DF = df_factFundRate.repartition($"YrEqual")//New change
for performance test*


*val df_YrEq_true =
partitioned_DF.filter("YrEqual=true").withColumnRenamed("validFromYr",
"yr_id").drop("validThruYr")*

*val exists = partitioned_DF.filter("YrEqual = false").count()*
*if(exists > 0) *



B) And the other scenario is -

*val df_cluster = sqlContext.sql("select * from factFundRate cluster by
YrEqual")*
*df_factFundRate.coalesce(50).write.mode("overwrite").option("header",
"true").partitionBy("YrEqual").parquet(args(25))*

*val df_YrEq_true =
sqlContext.read.parquet(args(25)+"YrEqual=true/").withColumnRenamed("validFromYr",
"yr_id").drop("validThruYr")*


*val hadoopconf = new Configuration()*
*val fileSystem = FileSystem.get(hadoopconf)*


*val exists = FileSystem.get(new URI(args(26)),
sparkContext.hadoopConfiguration).exists(new
Path(args(25)+"YrEqual=false"))*
*if(exists)*


The second scenario finishes within 6 mins whereas the first scenario takes
13 mins to complete.

Please help!


Thanks in adv,
Aakash.


Fwd: Repartition vs PartitionBy Help/Understanding needed

2017-06-16 Thread Aakash Basu
Hi all,

Can somebody put some light on this pls?

Thanks,
Aakash.
-- Forwarded message --
From: "Aakash Basu" <aakash.spark@gmail.com>
Date: 15-Jun-2017 2:57 PM
Subject: Repartition vs PartitionBy Help/Understanding needed
To: "user" <user@spark.apache.org>
Cc:

Hi all,
>
> Everybody is giving a difference between coalesce and repartition, but
> nowhere I found a difference between partitionBy and repartition. My
> question is, is it better to write a data set in parquet partitioning by a
> column and then reading the respective directories to work on that column
> in accordance and relevance or using repartition on that column to do the
> same in memory?
>
>
> A) One scenario is -
>
> *val partitioned_DF = df_factFundRate.repartition($"YrEqual")//New change
> for performance test*
>
>
> *val df_YrEq_true =
> partitioned_DF.filter("YrEqual=true").withColumnRenamed("validFromYr",
> "yr_id").drop("validThruYr")*
>
> *val exists = partitioned_DF.filter("YrEqual = false").count()*
> *if(exists > 0) *
>
>
>
> B) And the other scenario is -
>
> *val df_cluster = sqlContext.sql("select * from factFundRate cluster by
> YrEqual")*
> *df_factFundRate.coalesce(50).write.mode("overwrite").option("header",
> "true").partitionBy("YrEqual").parquet(args(25))*
>
> *val df_YrEq_true =
> sqlContext.read.parquet(args(25)+"YrEqual=true/").withColumnRenamed("validFromYr",
> "yr_id").drop("validThruYr")*
>
>
> *val hadoopconf = new Configuration()*
> *val fileSystem = FileSystem.get(hadoopconf)*
>
>
> *val exists = FileSystem.get(new URI(args(26)),
> sparkContext.hadoopConfiguration).exists(new
> Path(args(25)+"YrEqual=false"))*
> *if(exists)*
>
>
> The second scenario finishes within 6 mins whereas the first scenario
> takes 13 mins to complete.
>
> Please help!
>
>
> Thanks in adv,
> Aakash.
>


Help needed in Dividing open close dates column into multiple columns in dataframe

2017-09-19 Thread Aakash Basu
Hi,

I've a csv dataset which has a column with all the details of store open
and close timings as per dates, but the data is highly variant, as follows -


Mon-Fri 10am-9pm, Sat 10am-8pm, Sun 12pm-6pm
Mon-Sat 10am-8pm, Sun Closed
Mon-Sat 10am-8pm, Sun 10am-6pm
Mon-Friday 9-8 / Saturday 10-7 / Sunday 11-5
Mon-Sat 9am-8pm, Sun 10am-7pm
Mon-Sat 10am-8pm, 11am - 6pm
Mon-Fri 9am-6pm, Sat 10am-5pm, Sun Closed
Mon-Thur 10am-7pm, Fri 10am-5pm, Sat Closed, Sun 10am-5pm
Mon-Sat 10-7 Sun Closed
MON-FRI 10:00-8:00, SAT 10:00-7:00, SUN 12:00-5:00


I have to split the data of this one column into 14 columns, as -

Monday Open Time
Monday Close Time
Tuesday Open Time
Tuesday Close Time
Wednesday Open Time
Wednesday Close Time
Thursday Open Time
Thursday Close Time
Friday Open Time
Friday Close Time
Saturday Open Time
Saturday Close Time
Sunday Open Time
Sunday Close Time

Can someone please let me know if someone faced similar issue and also how
they resolved this in SparkSQL dataframes.

Using: CSV data, Spark 2.1, PySpark, using dataframes. (Tried using case
statement.)

Thanks,
Aakash.


Efficient Spark-Submit planning

2017-09-11 Thread Aakash Basu
Hi,

Can someone please clarify a little on how should we effectively calculate
the parameters to be passed over using spark-submit.

Parameters as in -

Cores, NumExecutors, DriverMemory, etc.

Is there any generic calculation which can be done over most kind of
clusters with different sizes from small 3 node to 100s of nodes.

Thanks,
Aakash.


Re: Reading Excel (.xlsm) file through PySpark 2.1.1 with external JAR is causing fatal conversion of data type

2017-08-17 Thread Aakash Basu
Hi Palwell,

Tried doing that, but its becoming null for all the dates after the
transformation with functions.

df2 = dflead.select('Enter_Date',f.to_date(df2.Enter_Date))


[image: Inline image 1]

Any insight?

Thanks,
Aakash.

On Fri, Aug 18, 2017 at 12:23 AM, Patrick Alwell <palw...@hortonworks.com>
wrote:

> Aakash,
>
> I’ve had similar issues with date-time formatting. Try using the functions
> library from pyspark.sql and the DF withColumns() method.
>
> ——
>
> from pyspark.sql import functions as f
>
> lineitem_df = lineitem_df.withColumn('shipdate',f.to_date(lineitem_
> df.shipdate))
>
> ——
>
> You should have first ingested the column as a string; and then leveraged
> the DF api to make the conversion to dateType.
>
> That should work.
>
> Kind Regards
>
> -Pat Alwell
>
>
> On Aug 17, 2017, at 11:48 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
> Hey all,
>
> Thanks! I had a discussion with the person who authored that package and
> informed about this bug, but in the meantime with the same thing, found a
> small tweak to ensure the job is done.
>
> Now that is fine, I'm getting the date as a string by predefining the
> Schema but I want to later convert it to a datetime format, which is making
> it this -
>
> >>> from pyspark.sql.functions import from_unixtime, unix_timestamp
> >>> df2 = dflead.select('Enter_Date', 
> >>> from_unixtime(unix_timestamp('Enter_Date',
> 'MM/dd/yyy')).alias('date'))
>
>
> >>> df2.show()
>
> 
>
> Which is not correct (as it is converting the 15 to 0015 instead of 2015.
> Do you guys think using the DateUtil package will solve this? Or any other
> solution with this built-in package?
>
> Please help!
>
> Thanks,
> Aakash.
>
> On Thu, Aug 17, 2017 at 12:01 AM, Jörn Franke <jornfra...@gmail.com>
> wrote:
>
>> You can use Apache POI DateUtil to convert double to Date (
>> https://poi.apache.org/apidocs/org/apache/poi/ss/usermodel/DateUtil.html).
>> Alternatively you can try HadoopOffice (https://github.com/ZuInnoTe/h
>> adoopoffice/wiki), it supports Spark 1.x or Spark 2.0 ds.
>>
>> On 16. Aug 2017, at 20:15, Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>> Hey Irving,
>>
>> Thanks for a quick revert. In Excel that column is purely string, I
>> actually want to import that as a String and later play around the DF to
>> convert it back to date type, but the API itself is not allowing me to
>> dynamically assign a Schema to the DF and I'm forced to inferSchema, where
>> itself, it is converting all numeric columns to double (Though, I don't
>> know how then the date column is getting converted to double if it is
>> string in the Excel source).
>>
>> Thanks,
>> Aakash.
>>
>>
>> On 16-Aug-2017 11:39 PM, "Irving Duran" <irving.du...@gmail.com> wrote:
>>
>> I think there is a difference between the actual value in the cell and
>> what Excel formats that cell.  You probably want to import that field as a
>> string or not have it as a date format in Excel.
>>
>> Just a thought....
>>
>>
>> Thank You,
>>
>> Irving Duran
>>
>> On Wed, Aug 16, 2017 at 12:47 PM, Aakash Basu <aakash.spark@gmail.com
>> > wrote:
>>
>>> Hey all,
>>>
>>> Forgot to attach the link to the overriding Schema through external
>>> package's discussion.
>>>
>>> https://github.com/crealytics/spark-excel/pull/13
>>>
>>> You can see my comment there too.
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On Wed, Aug 16, 2017 at 11:11 PM, Aakash Basu <
>>> aakash.spark@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am working on PySpark (*Python 3.6 and Spark 2.1.1*) and trying to
>>>> fetch data from an excel file using
>>>> *spark.read.format("com.crealytics.spark.excel")*, but it is inferring
>>>> double for a date type column.
>>>>
>>>> The detailed description is given here (the question I posted) -
>>>>
>>>> https://stackoverflow.com/questions/45713699/inferschema-usi
>>>> ng-spark-read-formatcom-crealytics-spark-excel-is-inferring-d
>>>>
>>>>
>>>> Found it is a probable bug with the crealytics excel read package.
>>>>
>>>> Can somebody help me with a workaround for this?
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>
>>>
>>
>>
>
>


Problem with CSV line break data in PySpark 2.1.0

2017-09-03 Thread Aakash Basu
Hi,

I've a dataset where a few rows of the column F as shown below have line
breaks in CSV file.

[image: Inline image 1]

When Spark is reading it, it is coming as below, which is a complete new
line.

[image: Inline image 2]

I want my PySpark 2.1.0 to read it by forcefully avoiding the line break
after the date, which is not happening as I am using com.databricks.csv
reader. And nulls are getting created after the date for line 2 for the
rest of the columns from G till end.

Can I please be helped how to handle this?

Thanks,
Aakash.


Re: Dynamic data ingestion into SparkSQL - Interesting question

2017-11-20 Thread Aakash Basu
Hi all,

Any help? PFB.

Thanks,
Aakash.

On 20-Nov-2017 6:58 PM, "Aakash Basu" <aakash.spark@gmail.com> wrote:

> Hi all,
>
> I have a table which will have 4 columns -
>
> |  Expression|filter_condition| from_clause|
> group_by_columns|
>
>
> This file may have variable number of rows depending on the no. of KPIs I
> need to calculate.
>
> I need to write a SparkSQL program which will have to read this file and
> run each line of queries dynamically by fetching each column value for a
> particular row and create a select query out of it and run inside a
> dataframe, later saving it as a temporary table.
>
> Did anyone do this kind of exercise? If yes, can I get some help on it pls?
>
> Thanks,
> Aakash.
>


Re: Dynamic data ingestion into SparkSQL - Interesting question

2017-11-21 Thread Aakash Basu
Yes, I did the same. It's working. Thanks!

On 21-Nov-2017 4:04 PM, "Fernando Pereira" <ferdonl...@gmail.com> wrote:

> Did you consider do string processing to build the SQL expression which
> you can execute with spark.sql(...)?
> Some examples: https://spark.apache.org/docs/latest/sql-
> programming-guide.html#hive-tables
>
> Cheers
>
> On 21 November 2017 at 03:27, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi all,
>>
>> Any help? PFB.
>>
>> Thanks,
>> Aakash.
>>
>> On 20-Nov-2017 6:58 PM, "Aakash Basu" <aakash.spark@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have a table which will have 4 columns -
>>>
>>> |  Expression|filter_condition| from_clause|
>>> group_by_columns|
>>>
>>>
>>> This file may have variable number of rows depending on the no. of KPIs
>>> I need to calculate.
>>>
>>> I need to write a SparkSQL program which will have to read this file and
>>> run each line of queries dynamically by fetching each column value for a
>>> particular row and create a select query out of it and run inside a
>>> dataframe, later saving it as a temporary table.
>>>
>>> Did anyone do this kind of exercise? If yes, can I get some help on it
>>> pls?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>


Dynamic data ingestion into SparkSQL - Interesting question

2017-11-20 Thread Aakash Basu
Hi all,

I have a table which will have 4 columns -

|  Expression|filter_condition| from_clause|
group_by_columns|


This file may have variable number of rows depending on the no. of KPIs I
need to calculate.

I need to write a SparkSQL program which will have to read this file and
run each line of queries dynamically by fetching each column value for a
particular row and create a select query out of it and run inside a
dataframe, later saving it as a temporary table.

Did anyone do this kind of exercise? If yes, can I get some help on it pls?

Thanks,
Aakash.


Fwd: Regarding column partitioning IDs and names as per hierarchical level SparkSQL

2017-10-31 Thread Aakash Basu
Hey all,

Any help in the below please?

Thanks,
Aakash.


-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Tue, Oct 31, 2017 at 9:17 PM
Subject: Regarding column partitioning IDs and names as per hierarchical
level SparkSQL
To: user <user@spark.apache.org>


Hi all,

I have to generate a table with Spark-SQL with the following columns -


Level One Id: VARCHAR(20) NULL
Level One Name: VARCHAR( 50) NOT NULL
Level Two Id: VARCHAR( 20) NULL
Level Two Name: VARCHAR(50) NULL
Level Thr ee Id: VARCHAR(20) NULL
Level Thr ee Name: VARCHAR(50) NULL
Level Four Id: VARCHAR(20) NULL
Level Four Name: VARCHAR( 50) NULL
Level Five Id: VARCHAR(20) NULL
Level Five Name: VARCHAR(50) NULL
Level Six Id: VARCHAR(20) NULL
Level Six Name: VARCHAR(50) NULL
Level Seven Id: VARCHAR( 20) NULL
Level Seven Name: VARCHAR(50) NULL
Level Eight Id: VARCHAR( 20) NULL
Level Eight Name: VARCHAR(50) NULL
Level Nine Id: VARCHAR(20) NULL
Level Nine Name: VARCHAR( 50) NULL
Level Ten Id: VARCHAR(20) NULL
Level Ten Name: VARCHAR(50) NULL

My input source has these columns -


ID Description ParentID
10 Great-Grandfather
1010 Grandfather 10
101010 1. Father A 1010
101011 2. Father B 1010
101012 4. Father C 1010
101013 5. Father D 1010
101015 3. Father E 1010
101018 Father F 1010
101019 6. Father G 1010
101020 Father H 1010
101021 Father I 1010
101022 2A. Father J 1010
10101010 2. Father K 101010
Like the above, I have ID till 20 digits, which means, I have 10 levels.

I want to populate the ID and name itself along with all the parents till
the root for any particular level, which I am unable to create a concrete
logic for.

Am using this way to fetch respecting levels and populate them in the
respective columns but not their parents -

Present Logic ->

FinalJoin_DF = spark.sql("select "
  + "case when length(a.id)/2 = '1' then a.id else
' ' end as level_one_id, "
  + "case when length(a.id)/2 = '1' then a.desc else ' ' end as
level_one_name, "
  + "case when length(a.id)/2 = '2' then a.id else ' ' end as level_two_id,
"
  + "case when length(a.id)/2 = '2' then a.desc else ' ' end as
level_two_name, "
  + "case when length(a.id)/2 = '3' then a.id else
' ' end as level_three_id, "
  + "case when length(a.id)/2 = '3' then a.desc
else ' ' end as level_three_name, "
  + "case when length(a.id)/2 = '4' then a.id else
' ' end as level_four_id, "
  + "case when length(a.id)/2 = '4' then a.desc
else ' ' end as level_four_name, "
  + "case when length(a.id)/2 = '5' then a.id else
' ' end as level_five_id, "
  + "case when length(a.id)/2 = '5' then a.desc
else ' ' end as level_five_name, "
  + "case when length(a.id)/2 = '6' then a.id else
' ' end as level_six_id, "
  + "case when length(a.id)/2 = '6' then a.desc else ' ' end as
level_six_name, "
  + "case when length(a.id)/2 = '7' then a.id else ' ' end as
level_seven_id, "
  + "case when length(a.id)/2 = '7' then a.desc
else ' ' end as level_seven_name, "
  + "case when length(a.id)/2 = '8' then a.id else
' ' end as level_eight_id, "
  + "case when length(a.id)/2 = '8' then a.desc else ' ' end as
level_eight_name, "
  + "case when length(a.id)/2 = '9' then a.id else
' ' end as level_nine_id, "
  + "case when length(a.id)/2 = '9' then a.desc else ' ' end as
level_nine_name, "
  + "case when length(a.id)/2 = '10' then a.id else ' ' end as
level_ten_id, "
  + "case when length(a.id)/2 = '10' then a.desc
else ' ' end as level_ten_name "
  + "from CategoryTempTable a")


Can someone help me in also populating all the parents levels in the
respective level ID and level name, please?


Thanks,
Aakash.


Regarding column partitioning IDs and names as per hierarchical level SparkSQL

2017-10-31 Thread Aakash Basu
Hi all,

I have to generate a table with Spark-SQL with the following columns -


Level One Id: VARCHAR(20) NULL
Level One Name: VARCHAR( 50) NOT NULL
Level Two Id: VARCHAR( 20) NULL
Level Two Name: VARCHAR(50) NULL
Level Thr ee Id: VARCHAR(20) NULL
Level Thr ee Name: VARCHAR(50) NULL
Level Four Id: VARCHAR(20) NULL
Level Four Name: VARCHAR( 50) NULL
Level Five Id: VARCHAR(20) NULL
Level Five Name: VARCHAR(50) NULL
Level Six Id: VARCHAR(20) NULL
Level Six Name: VARCHAR(50) NULL
Level Seven Id: VARCHAR( 20) NULL
Level Seven Name: VARCHAR(50) NULL
Level Eight Id: VARCHAR( 20) NULL
Level Eight Name: VARCHAR(50) NULL
Level Nine Id: VARCHAR(20) NULL
Level Nine Name: VARCHAR( 50) NULL
Level Ten Id: VARCHAR(20) NULL
Level Ten Name: VARCHAR(50) NULL

My input source has these columns -


ID Description ParentID
10 Great-Grandfather
1010 Grandfather 10
101010 1. Father A 1010
101011 2. Father B 1010
101012 4. Father C 1010
101013 5. Father D 1010
101015 3. Father E 1010
101018 Father F 1010
101019 6. Father G 1010
101020 Father H 1010
101021 Father I 1010
101022 2A. Father J 1010
10101010 2. Father K 101010
Like the above, I have ID till 20 digits, which means, I have 10 levels.

I want to populate the ID and name itself along with all the parents till
the root for any particular level, which I am unable to create a concrete
logic for.

Am using this way to fetch respecting levels and populate them in the
respective columns but not their parents -

Present Logic ->

FinalJoin_DF = spark.sql("select "
  + "case when length(a.id)/2 = '1' then a.id else
' ' end as level_one_id, "
  + "case when length(a.id)/2 = '1' then a.desc else ' ' end as
level_one_name, "
  + "case when length(a.id)/2 = '2' then a.id else ' ' end as level_two_id,
"
  + "case when length(a.id)/2 = '2' then a.desc else ' ' end as
level_two_name, "
  + "case when length(a.id)/2 = '3' then a.id else
' ' end as level_three_id, "
  + "case when length(a.id)/2 = '3' then a.desc
else ' ' end as level_three_name, "
  + "case when length(a.id)/2 = '4' then a.id else
' ' end as level_four_id, "
  + "case when length(a.id)/2 = '4' then a.desc
else ' ' end as level_four_name, "
  + "case when length(a.id)/2 = '5' then a.id else
' ' end as level_five_id, "
  + "case when length(a.id)/2 = '5' then a.desc
else ' ' end as level_five_name, "
  + "case when length(a.id)/2 = '6' then a.id else
' ' end as level_six_id, "
  + "case when length(a.id)/2 = '6' then a.desc else ' ' end as
level_six_name, "
  + "case when length(a.id)/2 = '7' then a.id else ' ' end as
level_seven_id, "
  + "case when length(a.id)/2 = '7' then a.desc
else ' ' end as level_seven_name, "
  + "case when length(a.id)/2 = '8' then a.id else
' ' end as level_eight_id, "
  + "case when length(a.id)/2 = '8' then a.desc else ' ' end as
level_eight_name, "
  + "case when length(a.id)/2 = '9' then a.id else
' ' end as level_nine_id, "
  + "case when length(a.id)/2 = '9' then a.desc else ' ' end as
level_nine_name, "
  + "case when length(a.id)/2 = '10' then a.id else ' ' end as
level_ten_id, "
  + "case when length(a.id)/2 = '10' then a.desc
else ' ' end as level_ten_name "
  + "from CategoryTempTable a")


Can someone help me in also populating all the parents levels in the
respective level ID and level name, please?


Thanks,
Aakash.


RE: Split column with dynamic data

2017-10-30 Thread Aakash Basu
Hey buddy,


Thanks a TON! Issue resolved.

Thanks again,
Aakash.

On 30-Oct-2017 11:44 PM, "Hondros, Constantine (ELS-AMS)" <
c.hond...@elsevier.com> wrote:

> You should just use regexp_replace to remove all the leading number
> information (assuming it ends with a full-stop, and catering for the
> possibility of a capital letter).
>
>
>
> This is untested, but it shoud do the trick based on your examples so far:
>
>
>
> df.withColumn(“new_column”, regexp_replace($”Description”, “^\d+A-Z?\.”,
> “”))
>
>
>
>
>
> *From:* Aakash Basu [mailto:aakash.spark@gmail.com]
> *Sent:* 30 October 2017 18:53
> *To:* user
> *Subject:* Split column with dynamic data
>
>
>
>  External email: use caution 
>
>
>
> Hi all,
>
>
>
> I've a requirement to split a column and fetch only the description where
> I have numbers appended before that for some rows whereas other rows have
> only the description -
>
>
>
> Eg - (Description is the column header)
>
>
>
> *Description*
>
> Inventory Tree
>
> Products
>
> 1. AT Services
>
> 2. Accessories
>
> 4. Miscellaneous
>
> 5. Service Center Items
>
> 3. 3rd Party Services
>
> Integrated Service
>
> 6. Demo Devices
>
> IT Department
>
> Merchandising
>
> 2A. Impulse
>
> 2. Handsets
>
> 3. Strategic Products
>
> 1. Opportunities
>
> 5. Features
>
> 6. Rate Plans
>
> 7. Other AT Incentives
>
> 8. Wired
>
> 4. Rate Plan Tier Change
>
> Integration SKUs (March 2016)
>
> 9. Financing SKUs
>
> 1. Smartphone
>
>
>
>
> From the above, I only want the words of description and remove the
> numbers. But since they're abrupt, it is difficult to use a single logic to
> have it done.
>
>
>
> Trying with the following options -
>
>
>
> 1) Using split by *fullstop*, together -
>
>
>
> *split_col = split(CategoryInp_DF['description'], '.')*
>
>
>
> *CategoryInp_DF= CategoryInp_DF.withColumn('unneeded',
> split_col.getItem(0))*
>
> *CategoryInp_DF= CategoryInp_DF.withColumn('description',
> split_col.getItem(1))*
>
>
>
>
>
> Result -> But both the output columns come as nulls.
>
>
>
> 2) Using split by *fullstop and space*, together -
>
> *split_col = split(CategoryInp_DF['description'], '. ')*
>
>
>
> *CategoryInp_DF= CategoryInp_DF.withColumn('unneeded',
> split_col.getItem(0))*
>
> *CategoryInp_DF= CategoryInp_DF.withColumn('description',
> split_col.getItem(1))*
>
>
>
> Result -> This is perfectly working for the numbered rows, but is
> unnecessarily splitting the non-numbered rows too.
>
>
>
> Any help would be greatly appreciated.
>
>
>
>
>
> Thanks,
>
> Aakash.
>
> --
>
> Elsevier B.V. Registered Office: Radarweg 29, 1043 NX Amsterdam, The
> Netherlands
> <https://maps.google.com/?q=Radarweg+29,+1043+NX+Amsterdam,+The+Netherlands=gmail=g>,
> Registration No. 33156677, Registered in The Netherlands.
>


Split column with dynamic data

2017-10-30 Thread Aakash Basu
Hi all,

I've a requirement to split a column and fetch only the description where I
have numbers appended before that for some rows whereas other rows have
only the description -

Eg - (Description is the column header)

*Description*
Inventory Tree
Products
1. AT Services
2. Accessories
4. Miscellaneous
5. Service Center Items
3. 3rd Party Services
Integrated Service
6. Demo Devices
IT Department
Merchandising
2A. Impulse
2. Handsets
3. Strategic Products
1. Opportunities
5. Features
6. Rate Plans
7. Other AT Incentives
8. Wired
4. Rate Plan Tier Change
Integration SKUs (March 2016)
9. Financing SKUs
1. Smartphone

>From the above, I only want the words of description and remove the
numbers. But since they're abrupt, it is difficult to use a single logic to
have it done.

Trying with the following options -

1) Using split by *fullstop*, together -

*split_col = split(CategoryInp_DF['description'], '.')*

*CategoryInp_DF= CategoryInp_DF.withColumn('unneeded',
split_col.getItem(0))*
*CategoryInp_DF= CategoryInp_DF.withColumn('description',
split_col.getItem(1))*


Result -> But both the output columns come as nulls.

2) Using split by *fullstop and space*, together -

*split_col = split(CategoryInp_DF['description'], '. ')*

*CategoryInp_DF= CategoryInp_DF.withColumn('unneeded',
split_col.getItem(0))*
*CategoryInp_DF= CategoryInp_DF.withColumn('description',
split_col.getItem(1))*

Result -> This is perfectly working for the numbered rows, but is
unnecessarily splitting the non-numbered rows too.

Any help would be greatly appreciated.


Thanks,
Aakash.


XGBoost on PySpark

2018-05-19 Thread Aakash Basu
Hi guys,

I need help in implementing XG-Boost in PySpark.

As per the conversation in a popular thread regarding XGB goes, it is
available in Scala and Java versions but not Python. But, we've to
implement a pythonic distributed solution (on Spark) maybe using DMLC or
similar, to go ahead with XGB solutioning.

Anybody implemented the same? If yes, please share some insight on how to
go about it.

Thanks,
Aakash.


Fwd: XGBoost on PySpark

2018-05-23 Thread Aakash Basu
Guys any insight on the below?

-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Sat, May 19, 2018 at 12:21 PM
Subject: XGBoost on PySpark
To: user <user@spark.apache.org>


Hi guys,

I need help in implementing XG-Boost in PySpark.

As per the conversation in a popular thread regarding XGB goes, it is
available in Scala and Java versions but not Python. But, we've to
implement a pythonic distributed solution (on Spark) maybe using DMLC or
similar, to go ahead with XGB solutioning.

Anybody implemented the same? If yes, please share some insight on how to
go about it.

Thanks,
Aakash.


[Query] Weight of evidence on Spark

2018-05-25 Thread Aakash Basu
Hi guys,

What's the best way to create feature column with Weight of Evidence
calculated for categorical columns on target column (both Binary and
Multi-Class)?

Any insight?

Thanks,
Aakash.


Spark 2.3 Memory Leak on Executor

2018-05-26 Thread Aakash Basu
Hi,

I am getting memory leak warning which ideally was a Spark bug back till
1.6 version and was resolved.

Mode: Standalone IDE: PyCharm Spark version: 2.3 Python version: 3.6

Below is the stack trace -

2018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak detected;
size = 262144 bytes, TID = 31482018-05-25 15:00:05 WARN  Executor:66 -
Managed memory leak detected; size = 262144 bytes, TID =
31522018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31512018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31502018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31492018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31532018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31542018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31582018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31552018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31572018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31602018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31612018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31562018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31592018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31652018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31632018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31622018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
3166

Any insight on why it may happen? Though my job is successfully getting
accomplished.

I've posted the query on StackOverflow

too.

P.S - No connection to database is kept open (as per a comment there).

Thanks,
Aakash.


Spark 2.3 Tree Error

2018-05-26 Thread Aakash Basu
Hi,

This query is based on one step further from the query in this link
.
In this scenario, I add 1 or 2 more columns to be processed, Spark throws
an ERROR by printing the physical plan of queries.

It says, *Resolved attribute(s) fnlwgt_bucketed#152530 missing* which is
untrue, as if I run the same code on less than 3 columns where this is one
column, it works like a charm, so I can clearly assume it is not a bug in
my query or code.

Is it then a out of memory error? As I think, internally, since there are
many registered tables on memory, they're getting deleted due to overflow
of data and getting deleted, this is totally my assumption. Any insight on
this? Did anyone of you face any issue like this?

py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.:
org.apache.spark.sql.AnalysisException: Resolved attribute(s)
fnlwgt_bucketed#152530 missing from
occupation#17,high_income#25,fnlwgt#13,education#14,marital-status#16,relationship#18,workclass#12,sex#20,id_num#10,native_country#24,race#19,education-num#15,hours-per-week#23,age_bucketed#152432,capital-loss#22,age#11,capital-gain#21,fnlwgt_bucketed#99009
in operator !Project [id_num#10, age#11, workclass#12, fnlwgt#13,
education#14, education-num#15, marital-status#16, occupation#17,
relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22,
hours-per-week#23, native_country#24, high_income#25,
age_bucketed#152432, fnlwgt_bucketed#152530, if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else
UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS
hours-per-week_bucketed#152299]. Attribute(s) with the same name
appear in the operation: fnlwgt_bucketed. Please check if the right
attribute(s) are used.;;Project [id_num#10, age#11, workclass#12,
fnlwgt#13, education#14, education-num#15, marital-status#16,
occupation#17, relationship#18, race#19, sex#20, capital-gain#21,
capital-loss#22, hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, fnlwgt_bucketed#99009,
hours-per-week_bucketed#152299, age_bucketed_WoE#152431, WoE#152524 AS
fnlwgt_bucketed_WoE#152529]+- Join Inner, (fnlwgt_bucketed#99009 =
fnlwgt_bucketed#152530)
   :- SubqueryAlias bucketed
   :  +- SubqueryAlias a
   : +- Project [id_num#10, age#11, workclass#12, fnlwgt#13,
education#14, education-num#15, marital-status#16, occupation#17,
relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22,
hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, fnlwgt_bucketed#99009,
hours-per-week_bucketed#152299, WoE#152426 AS age_bucketed_WoE#152431]
   :+- Join Inner, (age_bucketed#48257 = age_bucketed#152432)
   :   :- SubqueryAlias bucketed
   :   :  +- SubqueryAlias a
   :   : +- Project [id_num#10, age#11, workclass#12,
fnlwgt#13, education#14, education-num#15, marital-status#16,
occupation#17, relationship#18, race#19, sex#20, capital-gain#21,
capital-loss#22, hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, fnlwgt_bucketed#99009, if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else
UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS
hours-per-week_bucketed#152299]
   :   :+- Project [id_num#10, age#11, workclass#12,
fnlwgt#13, education#14, education-num#15, marital-status#16,
occupation#17, relationship#18, race#19, sex#20, capital-gain#21,
capital-loss#22, hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, if (isnull(cast(fnlwgt#13 as double))) null else
if (isnull(cast(fnlwgt#13 as double))) null else if
(isnull(cast(fnlwgt#13 as double))) null else
UDF:bucketizer_0(cast(fnlwgt#13 as double)) AS fnlwgt_bucketed#99009]
   :   :   +- Project [id_num#10, age#11,
workclass#12, fnlwgt#13, education#14, education-num#15,
marital-status#16, occupation#17, relationship#18, race#19, sex#20,
capital-gain#21, capital-loss#22, hours-per-week#23,
native_country#24, high_income#25, if (isnull(cast(age#11 as double)))
null else if (isnull(cast(age#11 as double))) null else if
(isnull(cast(age#11 as double))) null else

[Spark Streaming] Distinct Count on unrelated columns

2018-06-06 Thread Aakash Basu
Hi guys,

Posted a question (link)

on StackOverflow, any help?


Thanks,
Aakash.


[Spark Optimization] Why is one node getting all the pressure?

2018-06-11 Thread Aakash Basu
Hi,

I have submitted a job on* 4 node cluster*, where I see, most of the
operations happening at one of the worker nodes and other two are simply
chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

*Cores - 6*
*RAM - 12 GB*
*HDD - 60 GB*

My Spark Submit command is as follows -

*spark-submit --master spark://192.168.49.37:7077
 --num-executors 3 --executor-cores 5
--executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py*

What to do?

Thanks,
Aakash.


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-11 Thread Aakash Basu
Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but
the challenge is, after a certain point, I'm getting this error, after
which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on
192.168.49.39: Remote RPC client disassociated. Likely due to containers
exceeding thresholds, or network issues. Check driver logs for WARN
messages.



How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke  wrote:

> If it is in kB then spark will always schedule it to one node. As soon as
> it gets bigger you will see usage of more nodes.
>
> Hence increase your testing Dataset .
>
> On 11. Jun 2018, at 12:22, Aakash Basu  wrote:
>
> Jorn - The code is a series of feature engineering and model tuning
> operations. Too big to show. Yes, data volume is too low, it is in KBs,
> just tried to experiment with a small dataset before going for a large one.
>
> Akshay - I ran with your suggested spark configurations, I get this (the
> node changed, but the problem persists) -
>
> 
>
>
>
> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu 
> wrote:
>
>> try
>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf
>> spark.scheduler.mode=FAIR
>>
>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu 
>> wrote:
>>
>>> Hi,
>>>
>>> I have submitted a job on* 4 node cluster*, where I see, most of the
>>> operations happening at one of the worker nodes and other two are simply
>>> chilling out.
>>>
>>> Picture below puts light on that -
>>>
>>> How to properly distribute the load?
>>>
>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>>
>>> *Cores - 6*
>>> *RAM - 12 GB*
>>> *HDD - 60 GB*
>>>
>>> My Spark Submit command is as follows -
>>>
>>> *spark-submit --master spark://192.168.49.37:7077
>>> <http://192.168.49.37:7077> --num-executors 3 --executor-cores 5
>>> --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py*
>>>
>>> What to do?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>>
>


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-12 Thread Aakash Basu
Hi Srinath,

Thanks for such an elaborate reply. How to reduce the number of overall
tasks?

I found, after simply repartitioning the csv file into 8 parts and
converting it to parquet with snappy compression, helped not only in even
distribution of the tasks on all nodes, but also helped in bringing the end
to end job timing down to approx 0.8X of the prior run.

Query - Check if there are too many partitions in the RDD and tune it using
spark.sql.shuffle.partitions. How to do this? Because I have a huge
pipeline of memory and CPU intensive operations, which will ideally have
innumerable spark transformations. At which level should I apply the same?
My total tasks of an average dataset is going to around 2 millions
(approx), is it a bad show? How can I control? Do I need to re-factor my
entire Pipeline (series of codes) then?

Below is the new executors show while the updated run is taking place -




Thanks,
Aakash.

On Tue, Jun 12, 2018 at 2:14 PM, Srinath C  wrote:

> Hi Aakash,
>
> Can you check the logs for Executor ID 0? It was restarted on worker
> 192.168.49.39 perhaps due to OOM or something.
>
> Also observed that the number of tasks are high and unevenly distributed
> across the workers.
> Check if there are too many partitions in the RDD and tune it using
> spark.sql.shuffle.partitions.
> If the uneven distribution is still occurring then try repartitioning the
> data set using appropriate fields.
>
> Hope that helps.
> Regards,
> Srinath.
>
>
> On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu 
> wrote:
>
>> Yes, but when I did increase my executor memory, the spark job is going
>> to halt after running a few steps, even though, the executor isn't dying.
>>
>> Data - 60,000 data-points, 230 columns (60 MB data).
>>
>> Any input on why it behaves like that?
>>
>> On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla 
>> wrote:
>>
>>> Aakash,
>>>
>>> Like Jorn suggested, did you increase your test data set? If so, did you
>>> also update your executor-memory setting? It seems like you might exceeding
>>> the executor memory threshold.
>>>
>>> Thanks
>>> Vamshi Talla
>>>
>>> Sent from my iPhone
>>>
>>> On Jun 11, 2018, at 8:54 AM, Aakash Basu 
>>> wrote:
>>>
>>> Hi Jorn/Others,
>>>
>>> Thanks for your help. Now, data is being distributed in a proper way,
>>> but the challenge is, after a certain point, I'm getting this error, after
>>> which, everything stops moving ahead -
>>>
>>> 2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on
>>> 192.168.49.39
>>> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.39=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=T0QyzG2Sufk0kktKK3U2BVsAszvhCzx%2FFNnXOxpiWPs%3D=0>:
>>> Remote RPC client disassociated. Likely due to containers exceeding
>>> thresholds, or network issues. Check driver logs for WARN messages.
>>>
>>> 
>>>
>>> How to avoid this scenario?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke 
>>> wrote:
>>>
>>>> If it is in kB then spark will always schedule it to one node. As soon
>>>> as it gets bigger you will see usage of more nodes.
>>>>
>>>> Hence increase your testing Dataset .
>>>>
>>>> On 11. Jun 2018, at 12:22, Aakash Basu 
>>>> wrote:
>>>>
>>>> Jorn - The code is a series of feature engineering and model tuning
>>>> operations. Too big to show. Yes, data volume is too low, it is in KBs,
>>>> just tried to experiment with a small dataset before going for a large one.
>>>>
>>>> Akshay - I ran with your suggested spark configurations, I get this
>>>> (the node changed, but the problem persists) -
>>>>
>>>> 
>>>>
>>>>
>>>>
>>>> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu 
>>>> wrote:
>>>>
>>>>> try
>>>>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf
>>>>> spark.scheduler.mode=FAIR
>>>>>
>>>>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <
>>>>> aakash.spark@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have submitted a job on* 4 node cluster*, where I see, most of the
>>>>>> operations happening at one of the worker nodes and other two are simply
>>>>>> chilling out.
>>>>>>
>>>>>> Picture below puts light on that -
>>>>>>
>>>>>> How to properly distribute the load?
>>>>>>
>>>>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>>>>>
>>>>>> *Cores - 6*
>>>>>> *RAM - 12 GB*
>>>>>> *HDD - 60 GB*
>>>>>>
>>>>>> My Spark Submit command is as follows -
>>>>>>
>>>>>> *spark-submit --master spark://192.168.49.37:7077
>>>>>> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.37%3A7077=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=wS4drWE7%2FAJFXoUL3w0OzIRNL54RLKRTeMUBB%2BY1B28%3D=0>
>>>>>> --num-executors 3 --executor-cores 5 --executor-memory 4G
>>>>>> /appdata/bblite-codebase/prima_diabetes_indians.py*
>>>>>>
>>>>>> What to do?
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-12 Thread Aakash Basu
Yes, but when I did increase my executor memory, the spark job is going to
halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla  wrote:

> Aakash,
>
> Like Jorn suggested, did you increase your test data set? If so, did you
> also update your executor-memory setting? It seems like you might exceeding
> the executor memory threshold.
>
> Thanks
> Vamshi Talla
>
> Sent from my iPhone
>
> On Jun 11, 2018, at 8:54 AM, Aakash Basu 
> wrote:
>
> Hi Jorn/Others,
>
> Thanks for your help. Now, data is being distributed in a proper way, but
> the challenge is, after a certain point, I'm getting this error, after
> which, everything stops moving ahead -
>
> 2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on
> 192.168.49.39
> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.39=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=T0QyzG2Sufk0kktKK3U2BVsAszvhCzx%2FFNnXOxpiWPs%3D=0>:
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages.
>
> 
>
> How to avoid this scenario?
>
> Thanks,
> Aakash.
>
> On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke  wrote:
>
>> If it is in kB then spark will always schedule it to one node. As soon as
>> it gets bigger you will see usage of more nodes.
>>
>> Hence increase your testing Dataset .
>>
>> On 11. Jun 2018, at 12:22, Aakash Basu 
>> wrote:
>>
>> Jorn - The code is a series of feature engineering and model tuning
>> operations. Too big to show. Yes, data volume is too low, it is in KBs,
>> just tried to experiment with a small dataset before going for a large one.
>>
>> Akshay - I ran with your suggested spark configurations, I get this (the
>> node changed, but the problem persists) -
>>
>> 
>>
>>
>>
>> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu 
>> wrote:
>>
>>> try
>>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf
>>> spark.scheduler.mode=FAIR
>>>
>>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu >> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I have submitted a job on* 4 node cluster*, where I see, most of the
>>>> operations happening at one of the worker nodes and other two are simply
>>>> chilling out.
>>>>
>>>> Picture below puts light on that -
>>>>
>>>> How to properly distribute the load?
>>>>
>>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>>>
>>>> *Cores - 6*
>>>> *RAM - 12 GB*
>>>> *HDD - 60 GB*
>>>>
>>>> My Spark Submit command is as follows -
>>>>
>>>> *spark-submit --master spark://192.168.49.37:7077
>>>> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.37%3A7077=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=wS4drWE7%2FAJFXoUL3w0OzIRNL54RLKRTeMUBB%2BY1B28%3D=0>
>>>> --num-executors 3 --executor-cores 5 --executor-memory 4G
>>>> /appdata/bblite-codebase/prima_diabetes_indians.py*
>>>>
>>>> What to do?
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>
>>>
>>
>


Spark YARN Error - triggering spark-shell

2018-06-08 Thread Aakash Basu
Hi,

Getting this error when trying to run Spark Shell using YARN -

Command: *spark-shell --master yarn --deploy-mode client*

2018-06-08 13:39:09 WARN  Client:66 - Neither spark.yarn.jars nor
spark.yarn.archive is set, falling back to uploading libraries under
SPARK_HOME.
2018-06-08 13:39:25 ERROR SparkContext:91 - Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended!
It might have been killed or unable to launch application master.


The last half of stack-trace -

2018-06-08 13:56:11 WARN
YarnSchedulerBackend$YarnSchedulerEndpoint:66 - Attempted to request
executors before the AM has registered!
2018-06-08 13:56:11 WARN  MetricsSystem:66 - Stopping a MetricsSystem
that is not running
org.apache.spark.SparkException: Yarn application has already ended!
It might have been killed or unable to launch application master.
  at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:89)
  at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:63)
  at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)
  at org.apache.spark.SparkContext.(SparkContext.scala:500)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
  at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
  at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
  at scala.Option.getOrElse(Option.scala:121)
  at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:103)
  ... 55 elided
:14: error: not found: value spark
   import spark.implicits._
  ^
:14: error: not found: value spark
   import spark.sql


Tried putting the *spark-yarn_2.11-2.3.0.jar *in Hadoop yarn, still not
working, anything else to do?

Thanks,
Aakash.


Re: Spark YARN Error - triggering spark-shell

2018-06-08 Thread Aakash Basu
p/py4j/java_gateway.py",
line 1428, in __call__
  File
"/appdata/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Spark context stopped while waiting for
backend
at
org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:669)
at
org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:177)
at org.apache.spark.SparkContext.(SparkContext.scala:558)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

2018-06-08 14:26:59 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-06-08 14:26:59 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 -
OutputCommitCoordinator stopped!
2018-06-08 14:26:59 INFO  SparkContext:54 - Successfully stopped
SparkContext
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/appdata/spark/tmp/spark-35d9709e-8f20-4b57-82d3 -f3ef0926d3ab
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-1b471b46-0c5a-4f75-94c1-c99d9d674228

Seems the name-node and data-nodes cannot talk to each other correctly,
why, no clue, anyone faced this problem, any help on this?

Thanks,
Aakash.


On Fri, Jun 8, 2018 at 2:31 PM, Sathishkumar Manimoorthy <
mrsathishkuma...@gmail.com> wrote:

> It seems, your spark-on-yarn application is not able to get it's
> application master,
>
> org.apache.spark.SparkException: Yarn application has already ended! It might 
> have been killed or unable to launch application master.
>
>
> Check once on yarn logs
>
> Thanks,
> Sathish-
>
>
> On Fri, Jun 8, 2018 at 2:22 PM, Jeff Zhang  wrote:
>
>>
>> Check the yarn AM log for details.
>>
>>
>>
>> Aakash Basu 于2018年6月8日周五 下午4:36写道:
>>
>>> Hi,
>>>
>>> Getting this error when trying to run Spark Shell using YARN -
>>>
>>> Command: *spark-shell --master yarn --deploy-mode client*
>>>
>>> 2018-06-08 13:39:09 WARN  Client:66 - Neither spark.yarn.jars nor 
>>> spark.yarn.archive is set, falling back to uploading libraries under 
>>> SPARK_HOME.
>>> 2018-06-08 13:39:25 ERROR SparkContext:91 - Error initializing SparkContext.
>>> org.apache.spark.SparkException: Yarn application has already ended! It 
>>> might have been killed or unable to launch application master.
>>>
>>>
>>> The last half of stack-trace -
>>>
>>> 2018-06-08 13:56:11 WARN  YarnSchedulerBackend$YarnSchedulerEndpoint:66 - 
>>> Attempted to request executors before the AM has registered!
>>> 2018-06-08 13:56:11 WARN  MetricsSystem:66 - Stopping a MetricsSystem that 
>>> is not running
>>> org.apache.spark.SparkException: Yarn application has already ended! It 
>>> might have been killed or unable to launch application master.
>>>   at 
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:89)
>>>   at 
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:63)
>>>   at 
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)
>>>   at org.apache.spark.SparkContext.(SparkContext.scala:500)
>>>   at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
>>>   at 
>>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
>>>   at 
>>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
>>>   at scala.Option.getOrElse(Option.scala:121)
>>>   at 
>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
>>>   at org.apache.spark.repl.Main$.createSparkSession(Main.scala:103)
>>>   ... 55 elided
>>> :14: error: not found: value spark
>>>import spark.implicits._
>>>   ^
>>> :14: error: not found: value spark
>>>import spark.sql
>>>
>>>
>>> Tried putting the *spark-yarn_2.11-2.3.0.jar *in Hadoop yarn, still not
>>> working, anything else to do?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>


Re: Spark YARN job submission error (code 13)

2018-06-08 Thread Aakash Basu
p/py4j/java_gateway.py",
line 1428, in __call__
  File
"/appdata/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Spark context stopped while waiting for
backend
at
org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:669)
at
org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:177)
at org.apache.spark.SparkContext.(SparkContext.scala:558)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

2018-06-08 14:26:59 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-06-08 14:26:59 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 -
OutputCommitCoordinator stopped!
2018-06-08 14:26:59 INFO  SparkContext:54 - Successfully stopped
SparkContext
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/appdata/spark/tmp/spark-35d9709e-8f20-4b57-82d3 -f3ef0926d3ab
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-1b471b46-0c5a-4f75-94c1-c99d9d674228

Seems the name-node and data-nodes cannot talk to each other correctly,
why, no clue, anyone faced this problem, any help on this?

Thanks,
Aakash.

On Fri, Jun 8, 2018 at 2:17 PM, Saisai Shao  wrote:

> In Spark on YARN, error code 13 means SparkContext doesn't initialize in
> time. You can check the yarn application log to get more information.
>
> BTW, did you just write a plain python script without creating
> SparkContext/SparkSession?
>
> Aakash Basu  于2018年6月8日周五 下午4:15写道:
>
>> Hi,
>>
>> I'm trying to run a program on a cluster using YARN.
>>
>> YARN is present there along with HADOOP.
>>
>> Problem I'm running into is as below -
>>
>> Container exited with a non-zero exit code 13
>>> Failing this attempt. Failing the application.
>>>  ApplicationMaster host: N/A
>>>  ApplicationMaster RPC port: -1
>>>  queue: default
>>>  start time: 1528297574594
>>>  final status: FAILED
>>>  tracking URL: http://MasterNode:8088/cluster/app/application_
>>> 1528296308262_0004
>>>  user: bblite
>>> Exception in thread "main" org.apache.spark.SparkException: Application
>>> application_1528296308262_0004 finished with failed status
>>>
>>
>> I checked on the net and most of the stackoverflow problems say, that the
>> users have given *.master('local[*]')* in the code while invoking the
>> Spark Session and at the same time, giving *--master yarn* while doing
>> the spark-submit, hence they're getting the error due to conflict.
>>
>> But, in my case, I've not mentioned any master at all at the code. Just
>> trying to run it on yarn by giving *--master yarn* while doing the
>> spark-submit. Below is the code spark invoking -
>>
>> spark = SparkSession\
>> .builder\
>> .appName("Temp_Prog")\
>> .getOrCreate()
>>
>> Below is the spark-submit -
>>
>> *spark-submit --master yarn --deploy-mode cluster --num-executors 3
>> --executor-cores 6 --executor-memory 4G
>> /appdata/codebase/backend/feature_extraction/try_yarn.py*
>>
>> I've tried without --deploy-mode too, still no help.
>>
>> Thanks,
>> Aakash.
>>
>


Spark YARN job submission error (code 13)

2018-06-08 Thread Aakash Basu
Hi,

I'm trying to run a program on a cluster using YARN.

YARN is present there along with HADOOP.

Problem I'm running into is as below -

Container exited with a non-zero exit code 13
> Failing this attempt. Failing the application.
>  ApplicationMaster host: N/A
>  ApplicationMaster RPC port: -1
>  queue: default
>  start time: 1528297574594
>  final status: FAILED
>  tracking URL:
> http://MasterNode:8088/cluster/app/application_1528296308262_0004
>  user: bblite
> Exception in thread "main" org.apache.spark.SparkException: Application
> application_1528296308262_0004 finished with failed status
>

I checked on the net and most of the stackoverflow problems say, that the
users have given *.master('local[*]')* in the code while invoking the Spark
Session and at the same time, giving *--master yarn* while doing the
spark-submit, hence they're getting the error due to conflict.

But, in my case, I've not mentioned any master at all at the code. Just
trying to run it on yarn by giving *--master yarn* while doing the
spark-submit. Below is the code spark invoking -

spark = SparkSession\
.builder\
.appName("Temp_Prog")\
.getOrCreate()

Below is the spark-submit -

*spark-submit --master yarn --deploy-mode cluster --num-executors 3
--executor-cores 6 --executor-memory 4G
/appdata/codebase/backend/feature_extraction/try_yarn.py*

I've tried without --deploy-mode too, still no help.

Thanks,
Aakash.


Re: Spark YARN Error - triggering spark-shell

2018-06-08 Thread Aakash Basu
Fixed by adding 2 configurations in yarn-site,xml.

Thanks all!


On Fri, Jun 8, 2018 at 2:44 PM, Aakash Basu 
wrote:

> Hi,
>
> I fixed that problem by putting all the Spark JARS in spark-archive.zip
> and putting it in the HDFS (as that problem was happening for that reason) -
>
> But, I'm facing a new issue now, this is the new RPC error I get
> (Stack-Trace below) -
>
>
>
>
> 2018-06-08 14:26:43 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Running Spark version 2.3.0
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Submitted application:
> EndToEnd_FeatureEngineeringPipeline
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - SecurityManager:
> authentication disabled; ui acls disabled; users  with view permissions:
> Set(bblite); groups with view permissions: Set(); users  with modify
> permissions: Set(bblite); groups with modify permissions: Set()
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'sparkDriver' on port 41957.
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering MapOutputTracker
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering BlockManagerMaster
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 - Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 -
> BlockManagerMasterEndpoint up
> 2018-06-08 14:26:45 INFO  DiskBlockManager:54 - Created local directory at
> /appdata/spark/tmp/blockmgr-7b035871-a1f7-47ff-aad8-f7a43367836e
> 2018-06-08 14:26:45 INFO  MemoryStore:54 - MemoryStore started with
> capacity 366.3 MB
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
> 2018-06-08 14:26:45 INFO  log:192 - Logging initialized @3659ms
> 2018-06-08 14:26:45 INFO  Server:346 - jetty-9.3.z-SNAPSHOT
> 2018-06-08 14:26:45 INFO  Server:414 - Started @3733ms
> 2018-06-08 14:26:45 INFO  AbstractConnector:278 - Started
> ServerConnector@3080efb7{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'SparkUI' on port 4040.
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@2c3409b5{/jobs,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7f1ba569{/jobs/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@493631a1{/jobs/job,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@6b12f33c{/jobs/job/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@490023da{/stages,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@31c3a862{/stages/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@4da2454f{/stages/stage,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@552f182d{/stages/stage/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@a78a7fa{/stages/pool,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@15142105{/stages/pool/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7589c977{/storage,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@584a599b{/storage/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1742621f{/storage/rdd,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@23ea75fb{/storage/rdd/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1813d280{/environment,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@129fc698{/environment/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  

Re: Spark YARN job submission error (code 13)

2018-06-08 Thread Aakash Basu
Fixed by adding 2 configurations in yarn-site,xml.

Thanks all!

On Fri, Jun 8, 2018 at 2:44 PM, Aakash Basu 
wrote:

> Hi,
>
> I fixed that problem by putting all the Spark JARS in spark-archive.zip
> and putting it in the HDFS (as that problem was happening for that reason) -
>
> But, I'm facing a new issue now, this is the new RPC error I get
> (Stack-Trace below) -
>
>
>
>
> 2018-06-08 14:26:43 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Running Spark version 2.3.0
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Submitted application:
> EndToEnd_FeatureEngineeringPipeline
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - SecurityManager:
> authentication disabled; ui acls disabled; users  with view permissions:
> Set(bblite); groups with view permissions: Set(); users  with modify
> permissions: Set(bblite); groups with modify permissions: Set()
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'sparkDriver' on port 41957.
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering MapOutputTracker
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering BlockManagerMaster
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 - Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 -
> BlockManagerMasterEndpoint up
> 2018-06-08 14:26:45 INFO  DiskBlockManager:54 - Created local directory at
> /appdata/spark/tmp/blockmgr-7b035871-a1f7-47ff-aad8-f7a43367836e
> 2018-06-08 14:26:45 INFO  MemoryStore:54 - MemoryStore started with
> capacity 366.3 MB
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
> 2018-06-08 14:26:45 INFO  log:192 - Logging initialized @3659ms
> 2018-06-08 14:26:45 INFO  Server:346 - jetty-9.3.z-SNAPSHOT
> 2018-06-08 14:26:45 INFO  Server:414 - Started @3733ms
> 2018-06-08 14:26:45 INFO  AbstractConnector:278 - Started
> ServerConnector@3080efb7{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'SparkUI' on port 4040.
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@2c3409b5{/jobs,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7f1ba569{/jobs/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@493631a1{/jobs/job,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@6b12f33c{/jobs/job/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@490023da{/stages,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@31c3a862{/stages/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@4da2454f{/stages/stage,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@552f182d{/stages/stage/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@a78a7fa{/stages/pool,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@15142105{/stages/pool/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7589c977{/storage,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@584a599b{/storage/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1742621f{/storage/rdd,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@23ea75fb{/storage/rdd/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1813d280{/environment,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@129fc698{/environment/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  

Using G1GC in Spark

2018-06-14 Thread Aakash Basu
Hi,

I am trying to spark submit with G1GC for garbage collection, but it isn't
working.

What is the way to deploy a spark job with G1GC?

Tried -

*spark-submit --master spark://192.168.60.20:7077
 --conf -XX:+UseG1GC
/appdata/bblite-codebase/test.py*

Didn't work.

Tried -

*spark-submit --master spark://192.168.60.20:7077
 -XX:+UseG1GC /appdata/bblite-codebase/test.py*

Still didn't work. Would appreciate a help on this.

Thanks,
Aakash.


StackOverFlow ERROR - Bulk interaction for many columns fail

2018-06-18 Thread Aakash Basu
Hi,

When doing bulk interaction on around 60 columns, I want 3 columns to be
created out of each one of them, since it has a combination of 3, then it
becomes 60N2 * 3, which creates a lot of columns.

So, for a lesser than 50 - 60 columns, even though it takes time, it still
works fine, but, for a little larger number of columns, it throws this
error -

  File
> "/usr/local/lib/python3.5/dist-packages/backend/feature_extraction/cont_bulk_interactions.py",
> line 100, in bulktransformer_pairs
> df = df.withColumn(col_name, each_op(df[var_2], df[var_1]))
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 1849, in withColumn
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o26887.withColumn.
> : java.lang.StackOverflowError
> at
> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:70)
> at
> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:104)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:57)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:52)
> at
> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:229)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>

What to do?

Thanks,
Aakash.


Fwd: StackOverFlow ERROR - Bulk interaction for many columns fail

2018-06-18 Thread Aakash Basu
*Correction, 60C2 * 3*


-- Forwarded message --
From: Aakash Basu 
Date: Mon, Jun 18, 2018 at 4:15 PM
Subject: StackOverFlow ERROR - Bulk interaction for many columns fail
To: user 


Hi,

When doing bulk interaction on around 60 columns, I want 3 columns to be
created out of each one of them, since it has a combination of 3, then it
becomes 60C2 * 3, which creates a lot of columns.

So, for a lesser than 50 - 60 columns, even though it takes time, it still
works fine, but, for a little larger number of columns, it throws this
error -

  File "/usr/local/lib/python3.5/dist-packages/backend/feature_
> extraction/cont_bulk_interactions.py", line 100, in bulktransformer_pairs
> df = df.withColumn(col_name, each_op(df[var_2], df[var_1]))
>   File 
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 1849, in withColumn
>   File "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.
> 10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
>   File 
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco
>   File "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.
> 10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o26887.withColumn.
> : java.lang.StackOverflowError
> at scala.collection.generic.GenericTraversableTemplate$
> class.genericBuilder(GenericTraversableTemplate.scala:70)
> at scala.collection.AbstractTraversable.genericBuilder(Traversable.
> scala:104)
> at scala.collection.generic.GenTraversableFactory$
> GenericCanBuildFrom.apply(GenTraversableFactory.scala:57)
> at scala.collection.generic.GenTraversableFactory$
> GenericCanBuildFrom.apply(GenTraversableFactory.scala:52)
> at scala.collection.TraversableLike$class.builder$
> 1(TraversableLike.scala:229)
> at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:233)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>

What to do?

Thanks,
Aakash.


[Help] Codegen Stage grows beyond 64 KB

2018-06-16 Thread Aakash Basu
Hi guys,

I'm getting an error when I'm feature engineering on 30+ columns to create
about 200+ columns. It is not failing the job, but the ERROR shows. I want
to know how can I avoid this.

Spark - 2.3.1
Python - 3.6

Cluster Config -
1 Master - 32 GB RAM, 16 Cores
4 Slaves - 16 GB RAM, 8 Cores


Input data - 8 partitions of parquet file with snappy compression.

My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
--num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
--driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
spark.driver.maxResultSize=2G --conf
"spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py >
/appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt

Stack-Trace below -

ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
> Code of method "processNext()V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
> grows beyond 64 KB
> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
> Code of method "processNext()V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
> grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
> at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
> at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
> at
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> at
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> at
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
> at
> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> at
> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> 

Inferring from Event Timeline

2018-06-13 Thread Aakash Basu
Hi guys,

What all can be inferred by closely watching an event time-line in Spark
UI? I generally monitor the tasks taking more time and also how much in
parallel they're spinning.

What else?

Eg Event Time-line from Spark UI:


Thanks,
Aakash.


Re: Using G1GC in Spark

2018-06-14 Thread Aakash Basu
Thanks a lot Srinath, for your perpetual help.

On Thu, Jun 14, 2018 at 5:49 PM, Srinath C  wrote:

> You'll have to use "spark.executor.extraJavaOptions" configuration
> parameter:
> See documentation link
> <https://spark.apache.org/docs/latest/configuration.html#runtime-environment>
> .
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
>
> Regards,
> Srinath.
>
>
> On Thu, Jun 14, 2018 at 4:44 PM Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I am trying to spark submit with G1GC for garbage collection, but it
>> isn't working.
>>
>> What is the way to deploy a spark job with G1GC?
>>
>> Tried -
>>
>> *spark-submit --master spark://192.168.60.20:7077
>> <http://192.168.60.20:7077> --conf -XX:+UseG1GC
>> /appdata/bblite-codebase/test.py*
>>
>> Didn't work.
>>
>> Tried -
>>
>> *spark-submit --master spark://192.168.60.20:7077
>> <http://192.168.60.20:7077> -XX:+UseG1GC /appdata/bblite-codebase/test.py*
>>
>> Still didn't work. Would appreciate a help on this.
>>
>> Thanks,
>> Aakash.
>>
>


Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-16 Thread Aakash Basu
Hi,

I already went through it, that's one use case. I've a complex and very big
pipeline of multiple jobs under one spark session. Not getting, on how to
solve this, as it is happening over Logistic Regression and Random Forest
models, which I'm just using from Spark ML package rather than doing
anything by myself.

Thanks,
Aakash.

On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:

> Hi Akash,
>
> Please check stackoverflow.
>
>
> https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe
>
> Regards,
> Vaquar khan
>
> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu 
> wrote:
>
>> Hi guys,
>>
>> I'm getting an error when I'm feature engineering on 30+ columns to
>> create about 200+ columns. It is not failing the job, but the ERROR shows.
>> I want to know how can I avoid this.
>>
>> Spark - 2.3.1
>> Python - 3.6
>>
>> Cluster Config -
>> 1 Master - 32 GB RAM, 16 Cores
>> 4 Slaves - 16 GB RAM, 8 Cores
>>
>>
>> Input data - 8 partitions of parquet file with snappy compression.
>>
>> My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
>> --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
>> --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
>> spark.driver.maxResultSize=2G --conf
>> "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
>> spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
>> spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py >
>> /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>>
>> Stack-Trace below -
>>
>> ERROR CodeGenerator:91 - failed to compile:
>>> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
>>> Code of method "processNext()V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
>>> grows beyond 64 KB
>>> org.codehaus.janino.InternalCompilerException: Compiling
>>> "GeneratedClass": Code of method "processNext()V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
>>> grows beyond 64 KB
>>> at
>>> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
>>> at
>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
>>> at
>>> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
>>> at
>>> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>>> at
>>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>

Issue upgrading to Spark 2.3.1 (Maintenance Release)

2018-06-14 Thread Aakash Basu
Hi,

Downloaded the latest Spark version because the of the fix for "ERROR
AsyncEventQueue:70 - Dropping event from queue appStatus."

After setting environment variables and running the same code in PyCharm,
I'm getting this error, which I can't find a solution of.

Exception in thread "main" java.util.NoSuchElementException: key not found:
_PYSPARK_DRIVER_CONN_INFO_PATH
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at
org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:64)
at
org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Any help?

Thanks,
Aakash.


Crosstab/AproxQuantile Performance on Spark Cluster

2018-06-14 Thread Aakash Basu
Hi all,

Is the Event Timeline representing a good shape? I mean at a point, to
calculate WoE columns on categorical variables, I am having to do crosstab
on each column, and on a cluster of 4 nodes, it is taking time as I've 230+
columns and 60,000 rows. How to make it more performant?





Thanks,
Aakash.


Understanding Event Timeline of Spark UI

2018-06-15 Thread Aakash Basu
Hi,

I've a job running which shows the Event Timeline as follows, I am trying
to guess the gaps between these single lines, they seem to be parallel but
not immediately sequential with other stages.

Any other insight from this, and what is the cluster doing during these
gaps?




Thanks,
Aakash.


Re: Silly question on Dropping Temp Table

2018-05-26 Thread Aakash Basu
Question is, while registering, using registerTempTable() and while
dropping, using a dropTempView(), would it go and hit the same TempTable
internally or would search for a registered view? Not sure. Any idea?

On Sat, May 26, 2018 at 9:04 PM, SNEHASISH DUTTA <info.snehas...@gmail.com>
wrote:

> I think it's dropTempView
>
> On Sat, May 26, 2018, 8:56 PM Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I'm trying to use dropTempTable() after the respective Temporary Table's
>> use is over (to free up the memory for next calculations).
>>
>> Newer Spark Session doesn't need sqlContext, so, it is confusing me on
>> how to use the function.
>>
>> 1) Tried, same DF which I used to register a temp table to do -
>>
>> DF.dropTempTable('xyz')
>>
>> Didn't work.
>>
>> 2) Tried following way too, as spark internally invokes sqlContext too
>> along with sparkContext, but didn't work -
>>
>> spark.dropTempTable('xyz')
>>
>> 3) Tried spark.catalog to drop, this failed too -
>>
>> spark.catalog.dropTempTable('xyz')
>>
>>
>> What to do? 1.6 examples on internet are not working in the 2.3 version
>> for dropTempTable().
>>
>> Thanks,
>> Aakash.
>>
>


Re: Silly question on Dropping Temp Table

2018-05-26 Thread Aakash Basu
Well, it did, meaning, internally a TempTable and a TempView are the same.

Thanks buddy!

On Sat, May 26, 2018 at 9:23 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Question is, while registering, using registerTempTable() and while
> dropping, using a dropTempView(), would it go and hit the same TempTable
> internally or would search for a registered view? Not sure. Any idea?
>
> On Sat, May 26, 2018 at 9:04 PM, SNEHASISH DUTTA <info.snehas...@gmail.com
> > wrote:
>
>> I think it's dropTempView
>>
>> On Sat, May 26, 2018, 8:56 PM Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to use dropTempTable() after the respective Temporary Table's
>>> use is over (to free up the memory for next calculations).
>>>
>>> Newer Spark Session doesn't need sqlContext, so, it is confusing me on
>>> how to use the function.
>>>
>>> 1) Tried, same DF which I used to register a temp table to do -
>>>
>>> DF.dropTempTable('xyz')
>>>
>>> Didn't work.
>>>
>>> 2) Tried following way too, as spark internally invokes sqlContext too
>>> along with sparkContext, but didn't work -
>>>
>>> spark.dropTempTable('xyz')
>>>
>>> 3) Tried spark.catalog to drop, this failed too -
>>>
>>> spark.catalog.dropTempTable('xyz')
>>>
>>>
>>> What to do? 1.6 examples on internet are not working in the 2.3 version
>>> for dropTempTable().
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>


Re: Spark 2.3 Tree Error

2018-05-26 Thread Aakash Basu
You're right.

The same set of queries are working for max 2 columns in loop.

If I give more than 2 column, the 2nd column is failing with this error -

*attribute(s) with the same name appear in the operation:
marginal_adhesion_bucketed. Please check if the right attribute(s) are
used.*

Any idea on what maybe the reason?

I rechecked the query, its has correct logic.

On Sat, May 26, 2018 at 9:35 PM, hemant singh <hemant2...@gmail.com> wrote:

> Per the sql plan this is where it is failing -
>
> Attribute(s) with the same name appear in the operation: fnlwgt_bucketed. 
> Please check if the right attribute(s) are used.;
>
>
>
> On Sat, May 26, 2018 at 6:16 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi,
>>
>> This query is based on one step further from the query in this link
>> <https://stackoverflow.com/questions/50530679/spark-2-3-asynceventqueue-error-and-warning>.
>> In this scenario, I add 1 or 2 more columns to be processed, Spark throws
>> an ERROR by printing the physical plan of queries.
>>
>> It says, *Resolved attribute(s) fnlwgt_bucketed#152530 missing* which is
>> untrue, as if I run the same code on less than 3 columns where this is one
>> column, it works like a charm, so I can clearly assume it is not a bug in
>> my query or code.
>>
>> Is it then a out of memory error? As I think, internally, since there are
>> many registered tables on memory, they're getting deleted due to overflow
>> of data and getting deleted, this is totally my assumption. Any insight on
>> this? Did anyone of you face any issue like this?
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.: 
>> org.apache.spark.sql.AnalysisException: Resolved attribute(s) 
>> fnlwgt_bucketed#152530 missing from 
>> occupation#17,high_income#25,fnlwgt#13,education#14,marital-status#16,relationship#18,workclass#12,sex#20,id_num#10,native_country#24,race#19,education-num#15,hours-per-week#23,age_bucketed#152432,capital-loss#22,age#11,capital-gain#21,fnlwgt_bucketed#99009
>>  in operator !Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>> education#14, education-num#15, marital-status#16, occupation#17, 
>> relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22, 
>> hours-per-week#23, native_country#24, high_income#25, age_bucketed#152432, 
>> fnlwgt_bucketed#152530, if (isnull(cast(hours-per-week#23 as double))) null 
>> else if (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else 
>> UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS 
>> hours-per-week_bucketed#152299]. Attribute(s) with the same name appear in 
>> the operation: fnlwgt_bucketed. Please check if the right attribute(s) are 
>> used.;;Project [id_num#10, age#11, workclass#12, fnlwgt#13, education#14, 
>> education-num#15, marital-status#16, occupation#17, relationship#18, 
>> race#19, sex#20, capital-gain#21, capital-loss#22, hours-per-week#23, 
>> native_country#24, high_income#25, age_bucketed#48257, 
>> fnlwgt_bucketed#99009, hours-per-week_bucketed#152299, 
>> age_bucketed_WoE#152431, WoE#152524 AS fnlwgt_bucketed_WoE#152529]+- Join 
>> Inner, (fnlwgt_bucketed#99009 = fnlwgt_bucketed#152530)
>>:- SubqueryAlias bucketed
>>:  +- SubqueryAlias a
>>: +- Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>> education#14, education-num#15, marital-status#16, occupation#17, 
>> relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22, 
>> hours-per-week#23, native_country#24, high_income#25, age_bucketed#48257, 
>> fnlwgt_bucketed#99009, hours-per-week_bucketed#152299, WoE#152426 AS 
>> age_bucketed_WoE#152431]
>>:+- Join Inner, (age_bucketed#48257 = age_bucketed#152432)
>>:   :- SubqueryAlias bucketed
>>:   :  +- SubqueryAlias a
>>:   : +- Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>> education#14, education-num#15, marital-status#16, occupation

Re: Spark 2.3 Tree Error

2018-05-26 Thread Aakash Basu
I think I found the solution.

The last comment from this link -
https://issues.apache.org/jira/browse/SPARK-14948

But, my question is even after using table.column, why does Spark find the
same column name from two different tables ambiguous?

I mean table1.column = table2.column, Spark should comprehend that even
though the name of column is same but they come from two different tables,
isn't?

Well, I'll try out the solution provided above, and see if it works for me.

Thanks!

On Sat, May 26, 2018 at 9:45 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> You're right.
>
> The same set of queries are working for max 2 columns in loop.
>
> If I give more than 2 column, the 2nd column is failing with this error -
>
> *attribute(s) with the same name appear in the operation:
> marginal_adhesion_bucketed. Please check if the right attribute(s) are
> used.*
>
> Any idea on what maybe the reason?
>
> I rechecked the query, its has correct logic.
>
> On Sat, May 26, 2018 at 9:35 PM, hemant singh <hemant2...@gmail.com>
> wrote:
>
>> Per the sql plan this is where it is failing -
>>
>> Attribute(s) with the same name appear in the operation: fnlwgt_bucketed. 
>> Please check if the right attribute(s) are used.;
>>
>>
>>
>> On Sat, May 26, 2018 at 6:16 PM, Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> This query is based on one step further from the query in this link
>>> <https://stackoverflow.com/questions/50530679/spark-2-3-asynceventqueue-error-and-warning>.
>>> In this scenario, I add 1 or 2 more columns to be processed, Spark throws
>>> an ERROR by printing the physical plan of queries.
>>>
>>> It says, *Resolved attribute(s) fnlwgt_bucketed#152530 missing* which
>>> is untrue, as if I run the same code on less than 3 columns where this is
>>> one column, it works like a charm, so I can clearly assume it is not a bug
>>> in my query or code.
>>>
>>> Is it then a out of memory error? As I think, internally, since there
>>> are many registered tables on memory, they're getting deleted due to
>>> overflow of data and getting deleted, this is totally my assumption. Any
>>> insight on this? Did anyone of you face any issue like this?
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.: 
>>> org.apache.spark.sql.AnalysisException: Resolved attribute(s) 
>>> fnlwgt_bucketed#152530 missing from 
>>> occupation#17,high_income#25,fnlwgt#13,education#14,marital-status#16,relationship#18,workclass#12,sex#20,id_num#10,native_country#24,race#19,education-num#15,hours-per-week#23,age_bucketed#152432,capital-loss#22,age#11,capital-gain#21,fnlwgt_bucketed#99009
>>>  in operator !Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>>> education#14, education-num#15, marital-status#16, occupation#17, 
>>> relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22, 
>>> hours-per-week#23, native_country#24, high_income#25, age_bucketed#152432, 
>>> fnlwgt_bucketed#152530, if (isnull(cast(hours-per-week#23 as double))) null 
>>> else if (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else 
>>> UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS 
>>> hours-per-week_bucketed#152299]. Attribute(s) with the same name appear in 
>>> the operation: fnlwgt_bucketed. Please check if the right attribute(s) are 
>>> used.;;Project [id_num#10, age#11, workclass#12, fnlwgt#13, education#14, 
>>> education-num#15, marital-status#16, occupation#17, relationship#18, 
>>> race#19, sex#20, capital-gain#21, capital-loss#22, hours-per-week#23, 
>>> native_country#24, high_income#25, age_bucketed#48257, 
>>> fnlwgt_bucketed#99009, hours-per-week_bucketed#152299, 
>>> age_bucketed_WoE#152431, WoE#152524 AS fnlwgt_bucketed_WoE#152529]+- Join 
>>&g

Silly question on Dropping Temp Table

2018-05-26 Thread Aakash Basu
Hi all,

I'm trying to use dropTempTable() after the respective Temporary Table's
use is over (to free up the memory for next calculations).

Newer Spark Session doesn't need sqlContext, so, it is confusing me on how
to use the function.

1) Tried, same DF which I used to register a temp table to do -

DF.dropTempTable('xyz')

Didn't work.

2) Tried following way too, as spark internally invokes sqlContext too
along with sparkContext, but didn't work -

spark.dropTempTable('xyz')

3) Tried spark.catalog to drop, this failed too -

spark.catalog.dropTempTable('xyz')


What to do? 1.6 examples on internet are not working in the 2.3 version for
dropTempTable().

Thanks,
Aakash.


Fwd: [Help] PySpark Dynamic mean calculation

2018-05-31 Thread Aakash Basu
Solved it myself.

In-case anyone needs to reuse the code. Can be re-used.

orig_list = ['Married-spouse-absent', 'Married-AF-spouse',
'Separated', 'Married-civ-spouse', 'Widowed', 'Divorced',
'Never-married']
k_folds = 3

cols = df.columns  # ['fnlwgt_bucketed',
'Married-spouse-absent_fold_0', 'Married-AF-spouse_fold_0',
'Separated_fold_0', 'Married-civ-spouse_fold_0', 'Widowed_fold_0',
'Divorced_fold_0', 'Never-married_fold_0',
'Married-spouse-absent_fold_1', 'Married-AF-spouse_fold_1',
'Separated_fold_1', 'Married-civ-spouse_fold_1', 'Widowed_fold_1',
'Divorced_fold_1', 'Never-married_fold_1',
'Married-spouse-absent_fold_2', 'Married-AF-spouse_fold_2',
'Separated_fold_2', 'Married-civ-spouse_fold_2', 'Widowed_fold_2',
'Divorced_fold_2', 'Never-married_fold_2']

for folds in range(k_folds):
for column in orig_list:
col_namer = []
for fold in range(k_folds):
if fold != folds:
col_namer.append(column+'_fold_'+str(fold))
df = df.withColumn(column+'_fold_'+str(folds)+'_mean', (sum(df[col] for col in
col_namer)/(k_folds-1)))
print(col_namer)
df.show(1)



-- Forwarded message --
From: Aakash Basu 
Date: Thu, May 31, 2018 at 3:40 PM
Subject: [Help] PySpark Dynamic mean calculation
To: user 


Hi,

Using -
Python 3.6
Spark 2.3

Original DF -
key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2
1 1 2 3 4 5 6
2 7 5 3 5 2 1


I want to calculate means from the below  dataframe as follows (like this
for all columns and all folds) -

key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2 a_fold_0_mean
b_fold_0_mean a_fold_1_mean
1 1 2 3 4 5 6 3 + 5 / 2 4 + 6 / 2 1 + 5 / 2
2 7 5 3 5 2 1 3 + 2 / 2 5 + 1 / 2 7 + 2 / 2

Process -

For fold_0 my mean should be fold_1 + fold_2 / 2
For fold_1 my mean should be fold_0 + fold_2 / 2
For fold_2 my mean should be fold_0 + fold_1 / 2

For each column.

And my number of columns, no. of folds, everything would be dynamic.

How to go about this problem on a pyspark dataframe?

Thanks,
Aakash.


[Help] PySpark Dynamic mean calculation

2018-05-31 Thread Aakash Basu
Hi,

Using -
Python 3.6
Spark 2.3

Original DF -
key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2
1 1 2 3 4 5 6
2 7 5 3 5 2 1


I want to calculate means from the below  dataframe as follows (like this
for all columns and all folds) -

key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2 a_fold_0_mean
b_fold_0_mean a_fold_1_mean
1 1 2 3 4 5 6 3 + 5 / 2 4 + 6 / 2 1 + 5 / 2
2 7 5 3 5 2 1 3 + 2 / 2 5 + 1 / 2 7 + 2 / 2

Process -

For fold_0 my mean should be fold_1 + fold_2 / 2
For fold_1 my mean should be fold_0 + fold_2 / 2
For fold_2 my mean should be fold_0 + fold_1 / 2

For each column.

And my number of columns, no. of folds, everything would be dynamic.

How to go about this problem on a pyspark dataframe?

Thanks,
Aakash.


[Suggestions needed] Weight of Evidence PySpark

2018-05-31 Thread Aakash Basu
Hi guys,

I'm trying to calculate WoE on a particular categorical column depending on
the target column. But the code is taking a lot of time on very few
datapoints (rows).

How can I optimize it to make it performant enough?

Here's the code (here categorical_col is a python list of columns) -

for item in categorical_col:
new_df = spark.sql('Select `' + item + '`, `' + target_col + '`,
count(*) as Counts from a group by `'
   + item + '`, `' + target_col + '` order by `' +
item + '`')
# new_df.show()
new_df.registerTempTable('b')
# exit(0)
new_df2 = spark.sql('Select `' + item + '`, ' +
'case when `' + target_col + '` == 0 then
Counts else 0 end as Count_0, ' +
'case when `' + target_col + '` == 1 then
Counts else 0 end as Count_1 ' +
'from b')

spark.catalog.dropTempView('b')
# new_df2.show()
new_df2.registerTempTable('c')
# exit(0)

new_df3 = spark.sql('SELECT `' + item + '`, SUM(Count_0) AS Count_0, ' +
'SUM(Count_1) AS Count_1 FROM c GROUP BY `' +
item + '`')

spark.catalog.dropTempView('c')
# new_df3.show()
# exit(0)

new_df3.registerTempTable('d')

# SQL DF Experiment
new_df4 = spark.sql('Select `' + item + '` as
bucketed_col_of_source, Count_0/(select sum(d.Count_0) as sum from d)
as Prop_0, ' +
'Count_1/(select sum(d.Count_1) as sum from d)
as Prop_1 from d')

spark.catalog.dropTempView('d')
# new_df4.show()
# exit(0)
new_df4.registerTempTable('e')

new_df5 = spark.sql('Select *, case when log(e.Prop_0/e.Prop_1) IS
NULL then 0 else log(e.Prop_0/e.Prop_1) end as WoE from e')

spark.catalog.dropTempView('e')

# print('Problem starts here: ')
# new_df5.show()

new_df5.registerTempTable('WoE_table')

joined_Train_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item +
  '_WoE` from a bucketed inner join WoE_table
on bucketed.`' + item +
  '` = WoE_table.bucketed_col_of_source')

# joined_Train_DF.show()
joined_Test_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item +
  '_WoE` from test_data bucketed inner join
WoE_table on bucketed.`' + item +
  '` = WoE_table.bucketed_col_of_source')

if validation:
joined_Validation_DF = spark.sql('Select bucketed.*,
WoE_table.WoE as `' + item +
   '_WoE` from validation_data
bucketed inner join WoE_table on bucketed.`' + item +
   '` = WoE_table.bucketed_col_of_source')
WoE_Validation_DF = joined_Validation_DF

spark.catalog.dropTempView('WoE_table')

WoE_Train_DF = joined_Train_DF
WoE_Test_DF = joined_Test_DF
col_len = len(categorical_col)
if col_len > 1:
WoE_Train_DF.registerTempTable('a')
WoE_Test_DF.registerTempTable('test_data')
if validation:
# print('got inside...')
WoE_Validation_DF.registerTempTable('validation_data')

Any help?

Thanks,
Aakash.


Spark AsyncEventQueue doubt

2018-05-27 Thread Aakash Basu
Hi,

I'm getting the below ERROR and WARN when running a little heavy
calculation on a dataset -

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> 2018-05-27 12:51:11 ERROR AsyncEventQueue:70 - Dropping event from queue
> appStatus. This likely means one of the listeners is too slow and cannot
> keep up with the rate at which tasks are being started by the scheduler.
> 2018-05-27 12:51:11 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Thu Jan
> 01 05:30:00 IST 1970.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:52:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:51:11 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:53:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:52:14 IST 2018.
> 2018-05-27 12:54:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:53:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:55:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:54:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:56:15 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:55:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:57:32 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:56:15 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:58:32 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:57:32 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:59:33 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:58:32 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 13:00:34 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:59:33 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 13:01:35 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 13:00:34 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 13:02:36 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 13:01:35 IST 2018.
>

Even though my job is not failing but why am I getting these?

Thanks,
Aakash.


[Spark SQL] Efficiently calculating Weight of Evidence in PySpark

2018-06-01 Thread Aakash Basu
Hi guys,

Can anyone please let me know if you've any clue on this problem I posted
in StackOverflow -

https://stackoverflow.com/questions/50638911/how-to-efficiently-calculate-woe-in-pyspark

Thanks,
Aakash.


Re: Append In-Place to S3

2018-06-02 Thread Aakash Basu
As Jay suggested correctly, if you're joining then overwrite otherwise only
append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite')
because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:

> Hi Jay,
>
> Thanks for your response. Are you saying to append the new data and then
> remove the duplicates to the whole data set afterwards overwriting the
> existing data set with new data set with appended values? I will give that
> a try.
>
> Cheers,
> Ben
>
> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>
>> Benjamin,
>>
>> The append will append the "new" data to the existing data with removing
>> the duplicates. You would need to overwrite the file everytime if you need
>> unique values.
>>
>> Thanks,
>> Jayadeep
>>
>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>
>>> I have a situation where I trying to add only new rows to an existing
>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>> for each hour of the day. First, I create a DF from the existing data, then
>>> I use a query to create another DF with the data that is new. Here is the
>>> code snippet.
>>>
>>> df = spark.read.parquet(existing_data_path)
>>> df.createOrReplaceTempView(‘existing_data’)
>>> new_df = spark.read.parquet(new_data_path)
>>> new_df.createOrReplaceTempView(’new_data’)
>>> append_df = spark.sql(
>>> """
>>> WITH ids AS (
>>> SELECT DISTINCT
>>> source,
>>> source_id,
>>> target,
>>> target_id
>>> FROM new_data i
>>> LEFT ANTI JOIN existing_data im
>>> ON i.source = im.source
>>> AND i.source_id = im.source_id
>>> AND i.target = im.target
>>> AND i.target = im.target_id
>>> """
>>> )
>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>>> compression='gzip’)
>>>
>>>
>>> I thought this would append new rows and keep the data unique, but I am
>>> see many duplicates. Can someone help me with this and tell me what I am
>>> doing wrong?
>>>
>>> Thanks,
>>> Ben
>>>
>>


Fundamental Question on Spark's distribution

2018-06-07 Thread Aakash Basu
Hi all,

*Query 1)*

Need a serious help! I'm running feature engineering of different types on
a dataset and trying to benchmark from by tweaking different types of Spark
properties.

I don't know where it is going wrong that a single machine is working
faster than a 3 node cluster, even though, most of the operations from code
are distributed.

The log I collected by running in different ways is -

Remote Spark Benchmarking (4 node cluster, 1 driver, 3 workers) -

Cluster details: 12 GB RAM, 6 cores each.

Medium data -> 1,00,000 sample (0.1 million rows) [Data placed in Local
File System, same path, same data on all worker nodes]
Runs -
1) Time Taken for the Feature Engineering Pipeline to finish:
482.20375990867615 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G
2) Time Taken for the Feature Engineering Pipeline to finish:
467.3759717941284 secs.; --num-executors 10 --executor-cores 6
--executor-memory 11G
3) Time Taken for the Feature Engineering Pipeline to finish:
459.885710477829 secs.; --num-executors 3 --executor-cores 6
--executor-memory 8G
4) Time Taken for the Feature Engineering Pipeline to finish:
476.61902809143066 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G --conf spark.memory.fraction=0.2
5) Time Taken for the Feature Engineering Pipeline to finish:
575.9314386844635 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G --conf spark.default.parallelism=200

Medium data -> 1,00,000 sample (0.1 million rows) [Data placed in Local
File System]
1) Time Taken for the Feature Engineering Pipeline to finish:
594.1818737983704 secs.
2) Time Taken for the Feature Engineering Pipeline to finish:
528.6015181541443 secs. (on single driver node [local])
3) Time Taken for the Feature Engineering Pipeline to finish:
323.6546362755467 secs. (on my laptop - 16GB RAM and 8 Cores).

*Query 2)*

The below is the event timeline of the same code taken from the Spark UI,
can you provide some insight on why there are two big gaps between the
parallel tasks? Does it mean, that time, there's no operation happening? I
am kind of new to Spark UI monitoring, can anyone suggest me other aspects
which needs to be monitored to optimize further?





Thanks,
Aakash.


Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-20 Thread Aakash Basu
Hi Kazuaki,

It would be really difficult to produce a small S-A code to reproduce this
problem because, I'm running through a big pipeline of feature engineering
where I derive a lot of variables based on the present ones to kind of
explode the size of the table by many folds. Then, when I do any kind of
join, this error shoots up.

I tried with wholeStage.codegen=false, but that errors out the entire
program rather than running it with a lesser optimized code.

Any suggestion on how I can proceed towards a JIRA entry for this?

Thanks,
Aakash.

On Wed, Jun 20, 2018 at 9:41 PM, Kazuaki Ishizaki 
wrote:

> Spark 2.3 tried to split a large generated Java methods into small methods
> as possible. However, this report may remain places that generates a large
> method.
>
> Would it be possible to create a JIRA entry with a small stand alone
> program that can reproduce this problem? It would be very helpful that the
> community will address this problem.
>
> Best regards,
> Kazuaki Ishizaki
>
>
>
> From:vaquar khan 
> To:Eyal Zituny 
> Cc:Aakash Basu , user <
> user@spark.apache.org>
> Date:2018/06/18 01:57
> Subject:Re: [Help] Codegen Stage grows beyond 64 KB
> --
>
>
>
> Totally agreed with Eyal .
>
> The problem is that when Java programs generated using Catalyst from
> programs using DataFrame and Dataset are compiled into Java bytecode, the
> size of byte code of one method must not be 64 KB or more, This conflicts
> with the limitation of the Java class file, which is an exception that
> occurs.
>
> In order to avoid occurrence of an exception due to this restriction,
> within Spark, a solution is to split the methods that compile and make Java
> bytecode that is likely to be over 64 KB into multiple methods when
> Catalyst generates Java programs It has been done.
>
> Use persist or any other logical separation in pipeline.
>
> Regards,
> Vaquar khan
>
> On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny <*eyal.zit...@equalum.io*
> > wrote:
> Hi Akash,
> such errors might appear in large spark pipelines, the root cause is a
> 64kb jvm limitation.
> the reason that your job isn't failing at the end is due to spark fallback
> - if code gen is failing, spark compiler will try to create the flow
> without the code gen (less optimized)
> if you do not want to see this error, you can either disable code gen
> using the flag:  spark.sql.codegen.wholeStage= "false"
> or you can try to split your complex pipeline into several spark flows if
> possible
>
> hope that helps
>
> Eyal
>
> On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu <*aakash.spark@gmail.com*
> > wrote:
> Hi,
>
> I already went through it, that's one use case. I've a complex and very
> big pipeline of multiple jobs under one spark session. Not getting, on how
> to solve this, as it is happening over Logistic Regression and Random
> Forest models, which I'm just using from Spark ML package rather than doing
> anything by myself.
>
> Thanks,
> Aakash.
>
> On Sun 17 Jun, 2018, 8:21 AM vaquar khan, <*vaquar.k...@gmail.com*
> > wrote:
> Hi Akash,
>
> Please check stackoverflow.
>
>
> *https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe*
> <https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe>
>
> Regards,
> Vaquar khan
>
> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu <*aakash.spark@gmail.com*
> > wrote:
> Hi guys,
>
> I'm getting an error when I'm feature engineering on 30+ columns to create
> about 200+ columns. It is not failing the job, but the ERROR shows. I want
> to know how can I avoid this.
>
> Spark - 2.3.1
> Python - 3.6
>
> Cluster Config -
> 1 Master - 32 GB RAM, 16 Cores
> 4 Slaves - 16 GB RAM, 8 Cores
>
>
> Input data - 8 partitions of parquet file with snappy compression.
>
> My Spark-Submit -> spark-submit --master spark://*192.168.60.20:7077*
> <http://192.168.60.20:7077>--num-executors 4 --executor-cores 5
> --executor-memory 10G --driver-cores 5 --driver-memory 25G --conf
> spark.sql.shuffle.partitions=60 --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
> spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
> spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py
> > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>
>
> Stack-Trace below -
>
> ERROR CodeGenerator:91 - failed to compile: 
> o

G1GC vs ParallelGC

2018-06-20 Thread Aakash Basu
Hi guys,

I just wanted to know, why my ParallelGC (*--conf
"spark.executor.extraJavaOptions=-XX:+UseParallelGC"*) in a very long Spark
ML Pipeline works faster than when I set G1GC (*--conf
"spark.executor.extraJavaOptions=-XX:+UseG1GC"*), even though the Spark
community suggests G1GC to be much better than the ParallelGC.

Any pointers?

Thanks,
Aakash.


Way to avoid CollectAsMap in RandomForest

2018-06-20 Thread Aakash Basu
Hi,

I'm running RandomForest model from Spark ML API on a medium sized data
(2.25 million rows and 60 features), most of my time goes in the
CollectAsMap of RandomForest but I've no option to avoid it as it is in the
API.

Is there a way to cutshort my end to end runtime?



Thanks,
Aakash.


[G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Aakash Basu
Hi,

I used the below in the Spark Submit for using G1GC -

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"

Now, I want to use *-XX: -ResizePLAB *of the G1GC to control to avoid
performance degradation caused by a large number of thread communications.

How to do it? I tried submitting in the similar fashion -

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
"spark.executor.extraJavaOptions=*-XX: -ResizePLAB*", but it doesn't work.

Thanks,
Aakash.


Re: Inferring Data driven Spark parameters

2018-07-03 Thread Aakash Basu
We aren't using Oozie or similar, moreover, the end to end job shall be
exactly the same, but the data will be extremely different (number of
continuous and categorical columns, vertical size, horizontal size, etc),
hence, if there would have been a calculation of the parameters to arrive
at a conclusion that we can simply get the data and derive the respective
configuration/parameters, it would be great.

On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke  wrote:

> Don’t do this in your job. Create for different types of jobs different
> jobs and orchestrate them using oozie or similar.
>
> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
>
> Hi,
>
> Cluster - 5 node (1 Driver and 4 workers)
> Driver Config: 16 cores, 32 GB RAM
> Worker Config: 8 cores, 16 GB RAM
>
> I'm using the below parameters from which I know the first chunk is
> cluster dependent and the second chunk is data/code dependent.
>
> --num-executors 4
> --executor-cores 5
> --executor-memory 10G
> --driver-cores 5
> --driver-memory 25G
>
>
> --conf spark.sql.shuffle.partitions=100
> --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>
> I've come upto these values depending on my R on the properties and the
> issues I faced and hence the handles.
>
> My ask here is -
>
> *1) How can I infer, using some formula or a code, to calculate the below
> chunk dependent on the data/code?*
> *2) What are the other usable properties/configurations which I can use to
> shorten my job runtime?*
>
> Thanks,
> Aakash.
>
>


Inferring Data driven Spark parameters

2018-07-03 Thread Aakash Basu
Hi,

Cluster - 5 node (1 Driver and 4 workers)
Driver Config: 16 cores, 32 GB RAM
Worker Config: 8 cores, 16 GB RAM

I'm using the below parameters from which I know the first chunk is cluster
dependent and the second chunk is data/code dependent.

--num-executors 4
--executor-cores 5
--executor-memory 10G
--driver-cores 5
--driver-memory 25G


--conf spark.sql.shuffle.partitions=100
--conf spark.driver.maxResultSize=2G
--conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
--conf spark.scheduler.listenerbus.eventqueue.capacity=2

I've come upto these values depending on my R on the properties and the
issues I faced and hence the handles.

My ask here is -

*1) How can I infer, using some formula or a code, to calculate the below
chunk dependent on the data/code?*
*2) What are the other usable properties/configurations which I can use to
shorten my job runtime?*

Thanks,
Aakash.


PySpark 2.1 Not instantiating properly

2017-10-20 Thread Aakash Basu
Hi all,

I have Spark 2.1 installed in my laptop where I used to run all my
programs. PySpark wasn't used for around 1 month, and after starting it
now, I'm getting this exception (I've tried the solutions I could find on
Google, but to no avail).

Specs: Spark 2.1.1, Python 3.6, HADOOP 2.7, Windows 10 Pro, 64 Bits.


py4j.protocol.Py4JJavaError: An error occurred while calling
o27.sessionState.
: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.hive.HiveSessionState':
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:981)
at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:110)
at
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:978)
... 13 more
Caused by: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.hive.HiveExternalCatalog':
at
org.apache.spark.sql.internal.SharedState$.org$apache$spark$sql$internal$SharedState$$reflect(SharedState.scala:169)
at
org.apache.spark.sql.internal.SharedState.(SharedState.scala:86)
at
org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
at
org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:101)
at
org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:100)
at
org.apache.spark.sql.internal.SessionState.(SessionState.scala:157)
at
org.apache.spark.sql.hive.HiveSessionState.(HiveSessionState.scala:32)
... 18 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.spark.sql.internal.SharedState$.org$apache$spark$sql$internal$SharedState$$reflect(SharedState.scala:166)
... 26 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:358)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:262)
at
org.apache.spark.sql.hive.HiveExternalCatalog.(HiveExternalCatalog.scala:66)
... 31 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root
scratch dir: /tmp/hive on HDFS should be writable. Current permissions are:
rw-rw-rw-
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at
org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:188)
... 39 more
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on
HDFS should be writable. Current permissions are: rw-rw-rw-
at

Fwd: PySpark 2.1 Not instantiating properly

2017-10-20 Thread Aakash Basu
Hi,

Any help please? What can be the issue?

Thanks,
Aakash.
-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Fri, Oct 20, 2017 at 1:00 PM
Subject: PySpark 2.1 Not instantiating properly
To: user <user@spark.apache.org>


Hi all,

I have Spark 2.1 installed in my laptop where I used to run all my
programs. PySpark wasn't used for around 1 month, and after starting it
now, I'm getting this exception (I've tried the solutions I could find on
Google, but to no avail).

Specs: Spark 2.1.1, Python 3.6, HADOOP 2.7, Windows 10 Pro, 64 Bits.


py4j.protocol.Py4JJavaError: An error occurred while calling
o27.sessionState.
: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.hive.HiveSessionState':
at org.apache.spark.sql.SparkSession$.org$apache$
spark$sql$SparkSession$$reflect(SparkSession.scala:981)
at org.apache.spark.sql.SparkSession.sessionState$
lzycompute(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.sessionState(
SparkSession.scala:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(
ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.SparkSession$.org$apache$
spark$sql$SparkSession$$reflect(SparkSession.scala:978)
... 13 more
Caused by: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.hive.HiveExternalCatalog':
at org.apache.spark.sql.internal.SharedState$.org$apache$spark$
sql$internal$SharedState$$reflect(SharedState.scala:169)
at org.apache.spark.sql.internal.SharedState.(
SharedState.scala:86)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(
SparkSession.scala:101)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(
SparkSession.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sharedState$
lzycompute(SparkSession.scala:101)
at org.apache.spark.sql.SparkSession.sharedState(
SparkSession.scala:100)
at org.apache.spark.sql.internal.SessionState.(
SessionState.scala:157)
at org.apache.spark.sql.hive.HiveSessionState.(
HiveSessionState.scala:32)
... 18 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.internal.SharedState$.org$apache$spark$
sql$internal$SharedState$$reflect(SharedState.scala:166)
... 26 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.
createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
HiveUtils.scala:358)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
HiveUtils.scala:262)
at org.apache.spark.sql.hive.HiveExternalCatalog.(
HiveExternalCatalog.scala:66)
... 31 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root
scratch dir: /tmp/hive on HDFS should be writable. Current permissions are:
rw-rw-rw-
at org.apache.hadoop.hive.ql.session.Sessio

Re: PySpark 2.1 Not instantiating properly

2017-10-20 Thread Aakash Basu
Hey Marco/Jagat,

As I earlier informed you, that I've already done those basic checks and
permission changes.

eg: D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive, but to no avail. It
still throws the same error. At the very first place, I do not understand,
without any manual change, how did the permissions change automatically?

To Jagat's question - "Do you have winutils in your system relevant for
your system." - How to understand that? I did not find winutils specific to
OS/bits.

Any other solutions? Should I download the fresh zip of Spark and redo all
the steps of configuring? The chmod is just not working (without any errors
while submitting the above command).


Thanks,
Aakash.

On Fri, Oct 20, 2017 at 9:53 PM, Jagat Singh <jagatsi...@gmail.com> wrote:

> Do you have winutils in your system relevant for your system.
>
> This SO post has infomation related https://stackoverflow.
> com/questions/34196302/the-root-scratch-dir-tmp-hive-on-
> hdfs-should-be-writable-current-permissions
>
>
>
> On 21 October 2017 at 03:16, Marco Mistroni <mmistr...@gmail.com> wrote:
>
>> Did u build spark or download the zip?
>> I remember having similar issue...either you have to give write perm to
>> your /tmp directory or there's a spark config you need to override
>> This error is not 2.1 specific...let me get home and check my configs
>> I think I amended my /tmp permissions via xterm instead of control panel
>>
>> Hth
>>  Marco
>>
>>
>> On Oct 20, 2017 8:31 AM, "Aakash Basu" <aakash.spark@gmail.com>
>> wrote:
>>
>> Hi all,
>>
>> I have Spark 2.1 installed in my laptop where I used to run all my
>> programs. PySpark wasn't used for around 1 month, and after starting it
>> now, I'm getting this exception (I've tried the solutions I could find on
>> Google, but to no avail).
>>
>> Specs: Spark 2.1.1, Python 3.6, HADOOP 2.7, Windows 10 Pro, 64 Bits.
>>
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o27.sessionState.
>> : java.lang.IllegalArgumentException: Error while instantiating
>> 'org.apache.spark.sql.hive.HiveSessionState':
>> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$Spar
>> kSession$$reflect(SparkSession.scala:981)
>> at org.apache.spark.sql.SparkSession.sessionState$lzycompute(Sp
>> arkSession.scala:110)
>> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.
>> scala:109)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.jav
>> a:357)
>> at py4j.Gateway.invoke(Gateway.java:280)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j
>> ava:132)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:214)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native
>> ConstructorAccessorImpl.java:62)
>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De
>> legatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:4
>> 23)
>> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$Spar
>> kSession$$reflect(SparkSession.scala:978)
>> ... 13 more
>> Caused by: java.lang.IllegalArgumentException: Error while instantiating
>> 'org.apache.spark.sql.hive.HiveExternalCatalog':
>> at org.apache.spark.sql.internal.SharedState$.org$apache$spark$
>> sql$internal$SharedState$$reflect(SharedState.scala:169)
>> at org.apache.spark.sql.internal.SharedState.(SharedState
>> .scala:86)
>> at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.app
>> ly(SparkSession.scala:101)
>> at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.app
>> ly(SparkSession.scala:101)
>> at scala.Option.getOrElse(Option.scala:121)
>> at org.apache.spa

Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Aakash Basu
Thanks a ton!

On Tue, Jul 3, 2018 at 6:26 PM, Vadim Semenov  wrote:

> As typical `JAVA_OPTS` you need to pass as a single parameter:
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB"
>
> Also you got an extra space in the parameter, there should be no space
> after the colon symbol
> On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu 
> wrote:
> >
> > Hi,
> >
> > I used the below in the Spark Submit for using G1GC -
> >
> > --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
> >
> > Now, I want to use -XX: -ResizePLAB of the G1GC to control to avoid
> performance degradation caused by a large number of thread communications.
> >
> > How to do it? I tried submitting in the similar fashion -
> >
> > --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
> "spark.executor.extraJavaOptions=-XX: -ResizePLAB", but it doesn't work.
> >
> > Thanks,
> > Aakash.
>
>
>
> --
> Sent from my iPhone
>


Spark MLLib vs. SciKitLearn

2018-01-19 Thread Aakash Basu
Hi all,

I am totally new to ML APIs. Trying to get the *ROC_Curve* for Model
Evaluation on both *ScikitLearn* and *PySpark MLLib*. I do not find any API
for ROC_Curve calculation for BinaryClassification in SparkMLLib.

The codes below have a wrapper function which is creating the respective
dataframe from the source data with two columns which is as attached.

I want to achieve the same result as Python code in the Spark to get the
roc_curve. Is there any API from MLLib side to achieve the same?

Python sklearn Code -

def roc(self, y_true, y_pred):
df_a = self._df.copy()
values_1_tmp = df_a[y_true].values
values_1_tmp2 = values_1_tmp[~np.isnan(values_1_tmp)]
values_1 = values_1_tmp2.astype(int)
values_2_tmp = df_a[y_pred].values
values_2_tmp2 = values_2_tmp[~np.isnan(values_2_tmp)]
values_2 = values_2_tmp2.astype(int)
specificity, sensitivity, thresholds = metrics.roc_curve(values_1,
values_2, pos_label=2)
# area_under_roc = metrics.roc_auc_score(values_1, values_2)
print(sensitivity, specificity)
return sensitivity, specificity

Result:

[ 0.  0.34138342  0.67412045  1.] [ 0.  0.33373458
0.67378875  1.]


PySpark Code -

def roc(self, y_true, y_pred):
print('using pyspark df')
df_a = self._df
values_1 = list(df_a[y_true, y_pred].toPandas().values)
new_list = [l.tolist() for l in values_1]

double_list = []
for myList in new_list:
temp = []
for item in myList:
temp.append(float(item))
double_list.append(temp)

new_rdd = self._sc.parallelize(double_list)
metrics = BinaryClassificationMetrics(new_rdd)
roc_calc = metrics.areaUnderROC
print(roc_calc)
print(type(roc_calc))
return 1


Please help.

Thanks,
Aakash.


Re: Spark MLLib vs. SciKitLearn

2018-01-20 Thread Aakash Basu
Any help on the below?

On 19-Jan-2018 7:12 PM, "Aakash Basu" <aakash.spark@gmail.com> wrote:

> Hi all,
>
> I am totally new to ML APIs. Trying to get the *ROC_Curve* for Model
> Evaluation on both *ScikitLearn* and *PySpark MLLib*. I do not find any
> API for ROC_Curve calculation for BinaryClassification in SparkMLLib.
>
> The codes below have a wrapper function which is creating the respective
> dataframe from the source data with two columns which is as attached.
>
> I want to achieve the same result as Python code in the Spark to get the
> roc_curve. Is there any API from MLLib side to achieve the same?
>
> Python sklearn Code -
>
> def roc(self, y_true, y_pred):
> df_a = self._df.copy()
> values_1_tmp = df_a[y_true].values
> values_1_tmp2 = values_1_tmp[~np.isnan(values_1_tmp)]
> values_1 = values_1_tmp2.astype(int)
> values_2_tmp = df_a[y_pred].values
> values_2_tmp2 = values_2_tmp[~np.isnan(values_2_tmp)]
> values_2 = values_2_tmp2.astype(int)
> specificity, sensitivity, thresholds = metrics.roc_curve(values_1, 
> values_2, pos_label=2)
> # area_under_roc = metrics.roc_auc_score(values_1, values_2)
> print(sensitivity, specificity)
> return sensitivity, specificity
>
> Result:
>
> [ 0.  0.34138342  0.67412045  1.] [ 0.
> 0.33373458  0.67378875  1.]
>
>
> PySpark Code -
>
> def roc(self, y_true, y_pred):
> print('using pyspark df')
> df_a = self._df
> values_1 = list(df_a[y_true, y_pred].toPandas().values)
> new_list = [l.tolist() for l in values_1]
>
> double_list = []
> for myList in new_list:
> temp = []
> for item in myList:
> temp.append(float(item))
> double_list.append(temp)
>
> new_rdd = self._sc.parallelize(double_list)
> metrics = BinaryClassificationMetrics(new_rdd)
> roc_calc = metrics.areaUnderROC
> print(roc_calc)
> print(type(roc_calc))
> return 1
>
>
> Please help.
>
> Thanks,
> Aakash.
>


Is there any Spark ML or MLLib API for GINI for Model Evaluation? Please help! [EOM]

2018-01-21 Thread Aakash Basu



[Help] Converting a Python Numpy code into Spark using RDD

2018-01-21 Thread Aakash Basu
Hi,

How can I convert this Python Numpy code into Spark RDD so that the
operations leverage the Spark distributed architecture for Big Data.

Code is as follows -

def gini(array):
"""Calculate the Gini coefficient of a numpy array."""
array = array.flatten() #all values are treated equally, arrays must be 1d
if np.amin(array) < 0:
array -= np.amin(array) #values cannot be negative
array += 0.001 #values cannot be 0
array = np.sort(array) #values must be sorted
index = np.arange(1,array.shape[0]+1) #index per array element
n = array.shape[0]#number of array elements
return ((np.sum((2 * index - n  - 1) * array)) / (n *
np.sum(array))) #Gini coefficient




Thanks in adv,
Aakash.


[Doubt] GridSearch for Hyperparameter Tuning in Spark

2018-01-30 Thread Aakash Basu
Hi,

Is there any available pyspark ML or MLLib API for Grid Search similar to
GridSearchCV from model_selection of sklearn?

I found this - https://spark.apache.org/docs/2.2.0/ml-tuning.html, but it
has cross-validation and train-validation for hp-tuning and not pure grid
search.

Any help?

Thanks,
Aakash.


Re: Passing an array of more than 22 elements in a UDF

2017-12-25 Thread Aakash Basu
What's the privilege of using that specific version for this? Please throw
some light onto it.

On Mon, Dec 25, 2017 at 6:51 AM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> Or use it with Scala 2.11?
>
> --
> *From:* ayan guha <guha.a...@gmail.com>
> *Sent:* Friday, December 22, 2017 3:15:14 AM
> *To:* Aakash Basu
> *Cc:* user
> *Subject:* Re: Passing an array of more than 22 elements in a UDF
>
> Hi I think you are in correct track. You can stuff all your param in a
> suitable data structure like array or dict and pass this structure as a
> single param in your udf.
>
> On Fri, 22 Dec 2017 at 2:55 pm, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using Spark 2.2 using Java, can anyone please suggest me how to take
>> more than 22 parameters in an UDF? I mean, if I want to pass all the
>> parameters as an array of integers?
>>
>> Thanks,
>> Aakash.
>>
> --
> Best Regards,
> Ayan Guha
>


Passing an array of more than 22 elements in a UDF

2017-12-22 Thread Aakash Basu
Hi,

I am using Spark 2.2 using Java, can anyone please suggest me how to take
more than 22 parameters in an UDF? I mean, if I want to pass all the
parameters as an array of integers?

Thanks,
Aakash.


Re: Best way to process this dataset

2018-06-19 Thread Aakash Basu
Georg, just asking, can Pandas handle such a big dataset? If that data is
further passed into using any of the sklearn modules?

On Tue, Jun 19, 2018 at 10:35 AM, Georg Heiler 
wrote:

> use pandas or dask
>
> If you do want to use spark store the dataset as parquet / orc. And then
> continue to perform analytical queries on that dataset.
>
> Raymond Xie  schrieb am Di., 19. Juni 2018 um
> 04:29 Uhr:
>
>> I have a 3.6GB csv dataset (4 columns, 100,150,807 rows), my environment
>> is 20GB ssd harddisk and 2GB RAM.
>>
>> The dataset comes with
>> User ID: 987,994
>> Item ID: 4,162,024
>> Category ID: 9,439
>> Behavior type ('pv', 'buy', 'cart', 'fav')
>> Unix Timestamp: span between November 25 to December 03, 2017
>>
>> I would like to hear any suggestion from you on how should I process the
>> dataset with my current environment.
>>
>> Thank you.
>>
>> **
>> *Sincerely yours,*
>>
>>
>> *Raymond*
>>
>


Re: Query on Profiling Spark Code

2018-07-31 Thread Aakash Basu
Okay, sure!

On Tue, Jul 31, 2018 at 1:06 PM, Patil, Prashasth <
prashasth.pa...@spglobal.com> wrote:

> Hi Aakash,
>
> On a related note, you may want to try SparkLens for profiling which is
> quite helpful in my opinion.
>
>
>
>
>
> -Prash
>
>
>
> *From:* Aakash Basu [mailto:aakash.spark@gmail.com]
> *Sent:* Tuesday, July 17, 2018 12:41 PM
> *To:* user
> *Subject:* Query on Profiling Spark Code
>
>
>
> Hi guys,
>
>
>
> I'm trying to profile my Spark code on cProfiler and check where more time
> is taken. I found the most time taken is by some socket object, which I'm
> quite clueless of, as to where it is used.
>
>
>
> Can anyone shed some light on this?
>
>
>
>
>
> *ncalls*
>
> *tottime*
>
> *percall*
>
> *cumtime*
>
> *percall*
>
> *filename:lineno(function)*
>
> 11789
>
> 479.8
>
> 0.0407
>
> 479.8
>
> 0.0407
>
> ~:0()
>
>
>
>
>
> Thanks,
>
> Aakash.
>
> --
>
> The information contained in this message is intended only for the
> recipient, and may be a confidential attorney-client communication or may
> otherwise be privileged and confidential and protected from disclosure. If
> the reader of this message is not the intended recipient, or an employee or
> agent responsible for delivering this message to the intended recipient,
> please be aware that any dissemination or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please immediately notify us by replying to the message and deleting it
> from your computer. S Global Inc. reserves the right, subject to
> applicable local law, to monitor, review and process the content of any
> electronic message or information sent to or from S Global Inc. e-mail
> addresses without informing the sender or recipient of the message. By
> sending electronic message or information to S Global Inc. e-mail
> addresses you, as the sender, are consenting to S Global Inc. processing
> any of your personal data therein.
>


How to do PCA with Spark Streaming Dataframe?

2018-07-31 Thread Aakash Basu
Hi,

Just curious to know, how can we run a Principal Component Analysis on
streaming data in distributed mode? If we can, is it mathematically valid
enough?

Have anyone done that before? Can you guys share your experience over it?
Is there any API Spark provides to do the same on Spark Streaming mode?

Thanks,
Aakash.


Re: How to do PCA with Spark Streaming Dataframe?

2018-07-31 Thread Aakash Basu
FYI

The relevant StackOverflow query on the same -
https://stackoverflow.com/questions/51610482/how-to-do-pca-with-spark-streaming-dataframe

On Tue, Jul 31, 2018 at 3:18 PM, Aakash Basu 
wrote:

> Hi,
>
> Just curious to know, how can we run a Principal Component Analysis on
> streaming data in distributed mode? If we can, is it mathematically valid
> enough?
>
> Have anyone done that before? Can you guys share your experience over it?
> Is there any API Spark provides to do the same on Spark Streaming mode?
>
> Thanks,
> Aakash.
>


  1   2   >