Re: How to populate all possible combination values in columns using Spark SQL

2020-05-09 Thread Aakash Basu
I know how to pivot, but how to aggregate and pivot and build those True
and False combination is the doubt.

On Fri 8 May, 2020, 1:31 PM Edgardo Szrajber,  wrote:

> Have you checked the pivot function?
> Bentzi
>
> Sent from Yahoo Mail on Android
> <https://go.onelink.me/107872968?pid=InProduct=Global_Internal_YGrowth_AndroidEmailSig__AndroidUsers_wl=ym_sub1=Internal_sub2=Global_YGrowth_sub3=EmailSignature>
>
> On Thu, May 7, 2020 at 22:46, Aakash Basu
>  wrote:
> Hi,
>
> I've updated the SO question with masked data, added year column and other
> requirement. Please take a look.
>
> Hope this helps in solving the problem.
>
> Thanks and regards,
> AB
>
> On Thu 7 May, 2020, 10:59 AM Sonal Goyal,  wrote:
>
> As mentioned in the comments on SO, can you provide a (masked) sample of
> the data? It will be easier to see what you are trying to do if you add the
> year column
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
>
> On Thu, May 7, 2020 at 10:26 AM Aakash Basu 
> wrote:
>
> Hi,
>
> I've described the problem in Stack Overflow with a lot of detailing, can
> you kindly check and help if possible?
>
> https://stackoverflow.com/q/61643910/5536733
>
> I'd be absolutely fine if someone solves it using Spark SQL APIs rather
> than plain spark SQL query.
>
> Thanks,
> Aakash.
>
>


Re: How to populate all possible combination values in columns using Spark SQL

2020-05-07 Thread Aakash Basu
Hi,

I've updated the SO question with masked data, added year column and other
requirement. Please take a look.

Hope this helps in solving the problem.

Thanks and regards,
AB

On Thu 7 May, 2020, 10:59 AM Sonal Goyal,  wrote:

> As mentioned in the comments on SO, can you provide a (masked) sample of
> the data? It will be easier to see what you are trying to do if you add the
> year column
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
>
> On Thu, May 7, 2020 at 10:26 AM Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I've described the problem in Stack Overflow with a lot of detailing, can
>> you kindly check and help if possible?
>>
>> https://stackoverflow.com/q/61643910/5536733
>>
>> I'd be absolutely fine if someone solves it using Spark SQL APIs rather
>> than plain spark SQL query.
>>
>> Thanks,
>> Aakash.
>>
>


How to populate all possible combination values in columns using Spark SQL

2020-05-06 Thread Aakash Basu
Hi,

I've described the problem in Stack Overflow with a lot of detailing, can
you kindly check and help if possible?

https://stackoverflow.com/q/61643910/5536733

I'd be absolutely fine if someone solves it using Spark SQL APIs rather
than plain spark SQL query.

Thanks,
Aakash.


Which SQL flavor does Spark SQL follow?

2020-05-06 Thread Aakash Basu
Hi,

Wish to know, which type of SQL syntax is followed when we write a plain
SQL query inside spark.sql? Is it MySQL or PGSQL? I know it isn't SQL
Server or Oracle as while migrating, had to convert a lot of SQL functions.

Also if you can provide a documentation which clearly says the above would
help.

Thanks,
AB


unix_timestamp() equivalent in plain Spark SQL Query

2020-04-02 Thread Aakash Basu
Hi,

What is the unix_timestamp() function equivalent in a plain spark SQL query?

I want to subtract one timestamp column from another, but in plain SQL am
getting error "Should be numeric or calendarinterval and not timestamp."
But when I did through the above function inaide withColumn, it worked. But
I want to do if in case and am more handy with plain SQL rather than Spark
SQL functions.

Any guidance?

Thanks,
Aakash.


INTERVAL function not working

2020-04-02 Thread Aakash Basu
Hi,

Am unable to solve a comparison between two timestamp field's difference
and a particular interval of time in Spark SQL.

I've asked rhe question here: https://stackoverflow.com/questions/60995744

Thanks,
Aakash.


Re: Upsert for hive tables

2019-05-29 Thread Aakash Basu
Don't you have a date/timestamp to handle updates? So, you're talking about
CDC? If you've Datestamp you can check if that/those key(s) exists, if
exists then check if timestamp matches, if that matches, then ignore, if
that doesn't then update.

On Thu 30 May, 2019, 7:11 AM Genieliu,  wrote:

> Isn't step1 and step2 producing the copy of Table A?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Upsert for hive tables

2019-05-29 Thread Aakash Basu
Why don't you simply copy whole of delta data (Table A) into a stage table
(temp table in your case) and insert depending on a *WHERE NOT EXISTS* check
on primary key/composite key which already exists in the table B?

That's faster and does the reconciliation job smoothly enough.

Others, any better input?

On Wed 29 May, 2019, 10:50 PM Tomasz Krol,  wrote:

> Hey Guys,
>
> I am wondering what would be your approach to following scenario:
>
> I have two tables - one (Table A) is relatively small (e.g 50GB) and
> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>
>  I want to ADD all records from Table A to Table B which dont exist in
> Table B yet. I use only one field (e.g. key) to check existence for
> specific record.
>
> Then I want to UPDATE (by values from Table A) all records in Table B
> which also exist in Table A. To determine if specific record exist I use
> also the same "key" field.
>
> To achieve above I run following sql queries:
>
> 1. Find existing records and insert into temp table
>
> insert into temp_table select a.cols from Table A a left semi join Table B
> b on a.key = b.key
>
> 2. Find new records and insert them into temp table
>
> insert into temp_table select a.cols from Table A a left anti join Table B
> b on a.key = b.key
>
> 3. Find existing records in Table B which dont exist in   Table A
>
> insert into temp_table select b.cols from Table B b left anti join Table A
> a a.key = b. key
>
> In that way I built Table B updated with records from Table A.
> However, the problem here is the step 3, because I am inserting almost 3
> TB of data that takes obviously some time.
> I was trying different approaches but no luck.
>
> I am wondering whats your ideas how can we perform this scenario
> efficiently in Spark?
>
> Cheers
>
> Tom
> --
> Tomasz Krol
> patric...@gmail.com
>


Fetching LinkedIn data into PySpark using OAuth2.0

2019-05-20 Thread Aakash Basu
Hi,

Just curious to know if anyone was successful in connecting LinkedIn using
OAuth2.0, client ID and client secret to fetch data and process in
Python/PySpark.

I'm getting stuck at connection establishment.

Any help?

Thanks,
Aakash.


Data growth vs Cluster Size planning

2019-02-11 Thread Aakash Basu
Hi,

I ran a dataset of *200 columns and 0.2M records* in a cluster of *1 master
18 GB, 2 slaves 32 GB each, **16 cores/slave*, took around *772 minutes*
for a *very large ML tuning based job* (training).

Now, my requirement is to run the *same operation on 3M records*. Any idea
on how we should proceed? Should we go for a vertical scaling or a
horizontal one? How should this problem be approached in a
stepwise/systematic manner?

Thanks in advance.

Regards,
Aakash.


Avoiding collect but use foreach

2019-01-31 Thread Aakash Basu
Hi,

This:


*to_list = [list(row) for row in df.collect()]*


Gives:


[[5, 1, 1, 1, 2, 1, 3, 1, 1, 0], [5, 4, 4, 5, 7, 10, 3, 2, 1, 0], [3, 1, 1,
1, 2, 2, 3, 1, 1, 0], [6, 8, 8, 1, 3, 4, 3, 7, 1, 0], [4, 1, 1, 3, 2, 1, 3,
1, 1, 0]]


I want to avoid collect operation, but still convert the dataframe to a
python list of list just as above for downstream operations.


Is there a way, I can do it, maybe a better performant code that using
collect?


Thanks,

Aakash.


Silly Spark SQL query

2019-01-28 Thread Aakash Basu
Hi,

How to do this when the column (malignant and prediction) names are stored
in two respective variables?


tp = test_transformed[(test_transformed.malignant == 1) &
(test_transformed.prediction == 1)].count()



Thanks,
Aakash.


Re: Silly Spark SQL query

2019-01-28 Thread Aakash Basu
Well, it is done.


Using:

ma = "malignant"
pre = "prediction"
tp_test = test_transformed.filter((col(ma) == "1") & (col(pre) ==
"1")).count()

On Mon, Jan 28, 2019 at 5:41 PM Aakash Basu 
wrote:

> Hi,
>
> How to do this when the column (malignant and prediction) names are stored
> in two respective variables?
>
>
> tp = test_transformed[(test_transformed.malignant == 1) &
> (test_transformed.prediction == 1)].count()
>
>
>
> Thanks,
> Aakash.
>


Re: How to Overwrite a saved PySpark ML Model

2019-01-21 Thread Aakash Basu
Hey all,

The message seems to be a Java error message, and not a Python one. So, now
I tried by calling the writemethod first:

lr_model.write().overwrite().save(input_dict["config"]["save_model_path"])

It is still running, shall update if it works, otherwise shall need your
help.

Thanks,
Aakash.

On Mon, Jan 21, 2019 at 5:14 PM Aakash Basu 
wrote:

> Hi,
>
> I am trying to overwrite a Spark ML Logistic Regression Model, but it
> isn't working.
>
> Tried:
> a) lr_model.write.overwrite().save(input_dict["config"]["save_model_path"])
> and
> b) lr_model.write.overwrite.save(input_dict["config"]["save_model_path"])
>
> This works (if I do not want to overwrite):
> lr_model.save(input_dict["config"]["save_model_path"])
>
> Error:
> [2019-01-21] [04:57:44] [ERROR]  - AttributeError at line 163 of
> 'function' object has no attribute 'overwrite': 'function' object has no
> attribute 'overwrite'
> [2019-01-21] [04:57:44] [ERROR]  - 'function' object has no attribute
> 'overwrite'
> [2019-01-21] [04:57:44] [ERROR]  - Traceback (most recent call last):
>   File "", line 163, in run
> AttributeError: 'function' object has no attribute 'overwrite'
>
> What to do?
>
> Thanks,
> Aakash.
>


How to Overwrite a saved PySpark ML Model

2019-01-21 Thread Aakash Basu
Hi,

I am trying to overwrite a Spark ML Logistic Regression Model, but it isn't
working.

Tried:
a) lr_model.write.overwrite().save(input_dict["config"]["save_model_path"])
and
b) lr_model.write.overwrite.save(input_dict["config"]["save_model_path"])

This works (if I do not want to overwrite):
lr_model.save(input_dict["config"]["save_model_path"])

Error:
[2019-01-21] [04:57:44] [ERROR]  - AttributeError at line 163 of 'function'
object has no attribute 'overwrite': 'function' object has no attribute
'overwrite'
[2019-01-21] [04:57:44] [ERROR]  - 'function' object has no attribute
'overwrite'
[2019-01-21] [04:57:44] [ERROR]  - Traceback (most recent call last):
  File "", line 163, in run
AttributeError: 'function' object has no attribute 'overwrite'

What to do?

Thanks,
Aakash.


Re: Connection issue with AWS S3 from PySpark 2.3.1

2018-12-21 Thread Aakash Basu
Any help, anyone?

On Fri, Dec 21, 2018 at 2:21 PM Aakash Basu 
wrote:

> Hey Shuporno,
>
> With the updated config too, I am getting the same error. While trying to
> figure that out, I found this link which says I need aws-java-sdk (which I
> already have):
> https://github.com/amazon-archives/kinesis-storm-spout/issues/8
>
> Now, this is my java details:
>
> java version "1.8.0_181"
>
> Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
>
> Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
>
>
>
> Is it due to some java version mismatch then or is it something else I am
> missing out? What do you think?
>
> Thanks,
> Aakash.
>
> On Fri, Dec 21, 2018 at 1:43 PM Shuporno Choudhury <
> shuporno.choudh...@gmail.com> wrote:
>
>> Hi,
>> I don't know whether the following config (that you have tried) are
>> correct:
>> fs.s3a.awsAccessKeyId
>> fs.s3a.awsSecretAccessKey
>>
>> The correct ones probably are:
>> fs.s3a.access.key
>> fs.s3a.secret.key
>>
>> On Fri, 21 Dec 2018 at 13:21, Aakash Basu-2 [via Apache Spark User List] <
>> ml+s1001560n34217...@n3.nabble.com> wrote:
>>
>>> Hey Shuporno,
>>>
>>> Thanks for a prompt reply. Thanks for noticing the silly mistake, I
>>> tried this out, but still getting another error, which is related to
>>> connectivity it seems.
>>>
>>> >>> hadoop_conf.set("fs.s3a.awsAccessKeyId", "abcd")
>>>> >>> hadoop_conf.set("fs.s3a.awsSecretAccessKey", "123abc")
>>>> >>> a =
>>>> spark.read.csv("s3a:///test-bucket/breast-cancer-wisconsin.csv",
>>>> header=True)
>>>> Traceback (most recent call last):
>>>>   File "", line 1, in 
>>>>   File
>>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
>>>> line 441, in csv
>>>> return
>>>> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>>>>   File
>>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>>>> line 1257, in __call__
>>>>   File
>>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
>>>> line 63, in deco
>>>> return f(*a, **kw)
>>>>   File
>>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>>>> line 328, in get_return_value
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o220.csv.
>>>> : java.lang.NoClassDefFoundError:
>>>> com/amazonaws/auth/AWSCredentialsProvider
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:348)
>>>> at
>>>> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>>>> at
>>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
>>>> at
>>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>>> at
>>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
>>>> at
>>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
>>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>>>> at
>>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
>>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
>>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
>>>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>>>> at
>>>> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
>>>> at
>>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
>>>> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMe

Re: Connection issue with AWS S3 from PySpark 2.3.1

2018-12-21 Thread Aakash Basu
Hey Shuporno,

With the updated config too, I am getting the same error. While trying to
figure that out, I found this link which says I need aws-java-sdk (which I
already have):
https://github.com/amazon-archives/kinesis-storm-spout/issues/8

Now, this is my java details:

java version "1.8.0_181"

Java(TM) SE Runtime Environment (build 1.8.0_181-b13)

Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)



Is it due to some java version mismatch then or is it something else I am
missing out? What do you think?

Thanks,
Aakash.

On Fri, Dec 21, 2018 at 1:43 PM Shuporno Choudhury <
shuporno.choudh...@gmail.com> wrote:

> Hi,
> I don't know whether the following config (that you have tried) are
> correct:
> fs.s3a.awsAccessKeyId
> fs.s3a.awsSecretAccessKey
>
> The correct ones probably are:
> fs.s3a.access.key
> fs.s3a.secret.key
>
> On Fri, 21 Dec 2018 at 13:21, Aakash Basu-2 [via Apache Spark User List] <
> ml+s1001560n34217...@n3.nabble.com> wrote:
>
>> Hey Shuporno,
>>
>> Thanks for a prompt reply. Thanks for noticing the silly mistake, I tried
>> this out, but still getting another error, which is related to connectivity
>> it seems.
>>
>> >>> hadoop_conf.set("fs.s3a.awsAccessKeyId", "abcd")
>>> >>> hadoop_conf.set("fs.s3a.awsSecretAccessKey", "123abc")
>>> >>> a = spark.read.csv("s3a:///test-bucket/breast-cancer-wisconsin.csv",
>>> header=True)
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
>>> line 441, in csv
>>> return
>>> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>>> line 1257, in __call__
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
>>> line 63, in deco
>>> return f(*a, **kw)
>>>   File
>>> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>>> line 328, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o220.csv.
>>> : java.lang.NoClassDefFoundError:
>>> com/amazonaws/auth/AWSCredentialsProvider
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at
>>> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>>> at
>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
>>> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
>>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>>> at
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
>>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>>> at
>>> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
>>> at
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
>>> at
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
>>> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at py4j.Gateway.invoke(Gateway.java:282)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.Ga

Re: Connection issue with AWS S3 from PySpark 2.3.1

2018-12-20 Thread Aakash Basu
Hey Shuporno,

Thanks for a prompt reply. Thanks for noticing the silly mistake, I tried
this out, but still getting another error, which is related to connectivity
it seems.

>>> hadoop_conf.set("fs.s3a.awsAccessKeyId", "abcd")
> >>> hadoop_conf.set("fs.s3a.awsSecretAccessKey", "123abc")
> >>> a = spark.read.csv("s3a:///test-bucket/breast-cancer-wisconsin.csv",
> header=True)
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
> line 441, in csv
> return
> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
> line 63, in deco
> return f(*a, **kw)
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o220.csv.
> : java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
> at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.auth.AWSCredentialsProvider
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 28 more



Thanks,
Aakash.

On Fri, Dec 21, 2018 at 12:51 PM Shuporno Choudhury <
shuporno.choudh...@gmail.com> wrote:

>
>
> On Fri, 21 Dec 2018 at 12:47, Shuporno Choudhury <
> shuporno.choudh...@gmail.com> wrote:
>
>> Hi,
>> Your connection config uses 's3n' but your read command uses 's3a'.
>> The config for s3a are:
>> spark.hadoop.fs.s3a.access.key
>> spark.hadoop.fs.s3a.secret.key
>>
>> I feel this should solve the problem.
>>
>> On Fri, 21 Dec 2018 at 12:09, Aakash Basu-2 [via Apache Spark User List] <
>> ml+s1001560n34215...@n3.nabble.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to connect to AWS S3 and read a csv file (running POC) from
>>> a bucket.
>>>
>>> I have s3cmd and and being able to run ls and other operation from cli.
>>>
>>> *Present Configuration:*
>>> Python 3.7
>>> Spark 2.3.1
>>>
>>> *JARs added:*
>>> hadoop-aws-2.7.3.jar (in sync with the hadoop version used with spark)
>>> 

Connection issue with AWS S3 from PySpark 2.3.1

2018-12-20 Thread Aakash Basu
Hi,

I am trying to connect to AWS S3 and read a csv file (running POC) from a
bucket.

I have s3cmd and and being able to run ls and other operation from cli.

*Present Configuration:*
Python 3.7
Spark 2.3.1

*JARs added:*
hadoop-aws-2.7.3.jar (in sync with the hadoop version used with spark)
aws-java-sdk-1.11.472.jar

Trying out the following code:

>>> sc=spark.sparkContext
>
> >>> hadoop_conf=sc._jsc.hadoopConfiguration()
>
> >>> hadoop_conf.set("fs.s3n.awsAccessKeyId", "abcd")
>
> >>> hadoop_conf.set("fs.s3n.awsSecretAccessKey", "xyz123")
>
> >>> a = spark.read.csv("s3a://test-bucket/breast-cancer-wisconsin.csv",
> header=True)
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
> line 441, in csv
>
> return
> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
> line 63, in deco
>
> return f(*a, **kw)
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o33.csv.
>
> : java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>
> at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
>
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
>
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
>
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
>
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
>
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at py4j.Gateway.invoke(Gateway.java:282)
>
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.auth.AWSCredentialsProvider
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 28 more
>
>
> >>> a = spark.read.csv("s3a://test-bucket/breast-cancer-wisconsin.csv",
> header=True)
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
> line 441, in csv
>
> return
> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
> line 63, in deco
>
> return f(*a, **kw)
>
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o67.csv.
>
> : java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at
> 

Re: How to read remote HDFS from Spark using username?

2018-10-03 Thread Aakash Basu
If it is so, how to update/fix the firewall issue?

On Wed, Oct 3, 2018 at 1:14 PM Jörn Franke  wrote:

> Looks like a firewall issue
>
> Am 03.10.2018 um 09:34 schrieb Aakash Basu :
>
> The stacktrace is below -
>
> ---
>> Py4JJavaError Traceback (most recent call last)
>>  in ()
>> > 1 df = spark.read.load("hdfs://
>> 35.154.242.76:9000/auto-ml/projects/auto-ml-test__8503cdc4-21fc-4fae-87c1-5b879cafff71/data/breast-cancer-wisconsin.csv
>> ")
>> /opt/spark/python/pyspark/sql/readwriter.py in load(self, path, format,
>> schema, **options)
>>  164 self.options(**options)
>>  165 if isinstance(path, basestring):
>> --> 166 return self._df(self._jreader.load(path))
>>  167 elif path is not None:
>>  168 if type(path) != list:
>> /opt/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in
>> __call__(self, *args)
>>  1158 answer = self.gateway_client.send_command(command)
>>  1159 return_value = get_return_value(
>> -> 1160 answer, self.gateway_client, self.target_id, self.name)
>>  1161
>>  1162 for temp_arg in temp_args:
>> /opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>  61 def deco(*a, **kw):
>>  62 try:
>> ---> 63 return f(*a, **kw)
>>  64 except py4j.protocol.Py4JJavaError as e:
>>  65 s = e.java_exception.toString()
>> /opt/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in
>> get_return_value(answer, gateway_client, target_id, name)
>>  318 raise Py4JJavaError(
>>  319 "An error occurred while calling {0}{1}{2}.\n".
>> --> 320 format(target_id, ".", name), value)
>>  321 else:
>>  322 raise Py4JError(
>> Py4JJavaError: An error occurred while calling o244.load.
>> : java.net.ConnectException: Call From Sandeeps-MacBook-Pro.local/
>> 192.168.50.188 to ec2-35-154-242-76.ap-south-1.compute.amazonaws.com:9000
>> failed on connection exception: java.net.ConnectException: Connection
>> refused; For more details see:
>> http://wiki.apache.org/hadoop/ConnectionRefused
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>> at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:714)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Trave

Re: How to read remote HDFS from Spark using username?

2018-10-03 Thread Aakash Basu
taFrameReader.scala:239)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
> at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)
> at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712)
> at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
> at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
> at org.apache.hadoop.ipc.Client.call(Client.java:1451)
> ... 40 more


>
On Wed, Oct 3, 2018 at 12:32 PM Aakash Basu 
wrote:

> Hi,
>
> I have to read data stored in HDFS of a different machine and needs to be
> accessed through Spark for being read.
>
> How to do that? Full HDFS address along with port doesn't seem to work.
>
> Anyone did it before?
>
> Thanks,
> AB.
>


How to read remote HDFS from Spark using username?

2018-10-03 Thread Aakash Basu
Hi,

I have to read data stored in HDFS of a different machine and needs to be
accessed through Spark for being read.

How to do that? Full HDFS address along with port doesn't seem to work.

Anyone did it before?

Thanks,
AB.


Re: Time-Series Forecasting

2018-09-19 Thread Aakash Basu
Hey,

Even though I'm more of a Data Engineer than Data Scientist, but still, I
work closely with the DS guys extensively on Spark ML, it is something
which they're still working on following the scikit-learn trend, but, I
never saw Spark handling Time-Series problems. Talking about both
Scala-Spark and PySpark.

So, in short, I think it is yet to be added in the future releases of
Spark, that too, Scala-Spark will get the first release and then they'll
come to other language APIs in future minor releases as per need, usage and
importance.

Best,
AB.

On Thu 20 Sep, 2018, 4:43 AM ayan guha,  wrote:

> Hi
>
> I work mostly in data engineering and trying to promote use of sparkR
> within the company I recently joined. Some of the users are working around
> forecasting a bunch of things and want to use SparklyR as they found time
> series implementation is better than SparkR.
>
> Does anyone have a point of view regarding this? Is SparklyR is better
> than SparkR in certain use cases?
>
> On Thu, Sep 20, 2018 at 4:07 AM, Mina Aslani  wrote:
>
>> Hi,
>>
>> Thank you for your quick response, really appreciate it.
>>
>> I just started learning TimeSeries forecasting, and I may try different
>> methods and observe their predictions/forecasting.However, my
>> understanding is that below methods are needed:
>>
>> - Smoothing
>> - Decomposing(e.g. remove/separate trend/seasonality)
>> - AR Model/MA Model/Combined Model (e.g. ARMA, ARIMA)
>> - ACF (Autocorrelation Function)/PACF (Partial Autocorrelation Function)
>> - Recurrent Neural Network (LSTM: Long Short Term Memory)
>>
>> Kindest regards,
>> Mina
>>
>>
>>
>> On Wed, Sep 19, 2018 at 12:55 PM Jörn Franke 
>> wrote:
>>
>>> What functionality do you need ? Ie which methods?
>>>
>>> > On 19. Sep 2018, at 18:01, Mina Aslani  wrote:
>>> >
>>> > Hi,
>>> > I have a question for you. Do we have any Time-Series Forecasting
>>> library in Spark?
>>> >
>>> > Best regards,
>>> > Mina
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Should python-2 be supported in Spark 3.0?

2018-09-16 Thread Aakash Basu
Removing support for an API in a major release makes poor sense,
deprecating is always better. Removal can always be done two - three minor
release later.

On Mon 17 Sep, 2018, 6:49 AM Felix Cheung, 
wrote:

> I don’t think we should remove any API even in a major release without
> deprecating it first...
>
>
> --
> *From:* Mark Hamstra 
> *Sent:* Sunday, September 16, 2018 12:26 PM
> *To:* Erik Erlandson
> *Cc:* user@spark.apache.org; dev
> *Subject:* Re: Should python-2 be supported in Spark 3.0?
>
> We could also deprecate Py2 already in the 2.4.0 release.
>
> On Sat, Sep 15, 2018 at 11:46 AM Erik Erlandson 
> wrote:
>
>> In case this didn't make it onto this thread:
>>
>> There is a 3rd option, which is to deprecate Py2 for Spark-3.0, and
>> remove it entirely on a later 3.x release.
>>
>> On Sat, Sep 15, 2018 at 11:09 AM, Erik Erlandson 
>> wrote:
>>
>>> On a separate dev@spark thread, I raised a question of whether or not
>>> to support python 2 in Apache Spark, going forward into Spark 3.0.
>>>
>>> Python-2 is going EOL  at
>>> the end of 2019. The upcoming release of Spark 3.0 is an opportunity to
>>> make breaking changes to Spark's APIs, and so it is a good time to consider
>>> support for Python-2 on PySpark.
>>>
>>> Key advantages to dropping Python 2 are:
>>>
>>>- Support for PySpark becomes significantly easier.
>>>- Avoid having to support Python 2 until Spark 4.0, which is likely
>>>to imply supporting Python 2 for some time after it goes EOL.
>>>
>>> (Note that supporting python 2 after EOL means, among other things, that
>>> PySpark would be supporting a version of python that was no longer
>>> receiving security patches)
>>>
>>> The main disadvantage is that PySpark users who have legacy python-2
>>> code would have to migrate their code to python 3 to take advantage of
>>> Spark 3.0
>>>
>>> This decision obviously has large implications for the Apache Spark
>>> community and we want to solicit community feedback.
>>>
>>>
>>


Local vs Cluster

2018-09-14 Thread Aakash Basu
Hi,

What is the Spark cluster equivalent of standalone's local[N]. I mean, the
value we set as a parameter of local as N, which parameter takes it in the
cluster mode?

Thanks,
Aakash.


[Help] Set nThread in Spark cluster

2018-09-12 Thread Aakash Basu
Hi,

API = ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbParam = Map("eta" -> 0.1f,

 |   "max_depth" -> 2,

 |   "objective" -> "multi:softprob",

 |   "num_class" -> 3,

 |   "num_round" -> 100,

 |   "num_workers" -> 2)

I'm running a job which will not work until the number of threads of the
API is equivalent to the num_worker set for Spark.

So, in master = local mode, when I do --master local[n] and also set
num_worker for that API as the same value as n, it works.

But, in cluster I do not know which parameter to control which precisely
takes the call of handling the number of threads. I tried with -

1) spark.task.cpus
2) spark.default.parallelism
3) executor cores

But, none of them works, and the speciality of this issue is, it goes into
a halt while distributing the XGBoost model if the above condition is not
met.


My code is as follows, it works in local mode, but not in cluster, any help?

Code:


>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *import org.apache.spark.sql.types.StringTypeimport
> org.apache.spark.sql.types.DoubleTypeimport
> org.apache.spark.sql.types.StructFieldimport
> org.apache.spark.sql.types.StructTypeval schema = new StructType(Array(
> StructField("sepal length", DoubleType, true),  StructField("sepal width",
> DoubleType, true),  StructField("petal length", DoubleType, true),
> StructField("petal width", DoubleType, true),  StructField("class",
> StringType, true)))val rawInput =
> spark.read.schema(schema).csv("file:///appdata/bblite-data/iris.csv")import
> org.apache.spark.ml.feature.StringIndexerval stringIndexer = new
> StringIndexer().  setInputCol("class").  setOutputCol("classIndex").
> fit(rawInput)val labelTransformed =
> stringIndexer.transform(rawInput).drop("class")import
> org.apache.spark.ml.feature.VectorAssemblerval vectorAssembler = new
> VectorAssembler().  setInputCols(Array("sepal length", "sepal width",
> "petal length", "petal width")).  setOutputCol("features")val xgbInput =
> vectorAssembler.transform(labelTransformed).select("features",
> "classIndex")import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifierval
> xgbParam = Map("eta" -> 0.1f,  "max_depth" -> 2,  "objective" ->
> "multi:softprob",  "num_class" -> 3,  "num_round" -> 100,
> "num_workers" -> 2)val xgbClassifier = new XGBoostClassifier(xgbParam).
>   setFeaturesCol("features").  setLabelCol("classIndex")val
> xgbClassificationModel = xgbClassifier.fit(xgbInput)*



Link of the same question I posted in stackoverflow:
https://stackoverflow.com/questions/52290938/set-nthread-in-spark-cluster-for-xgboost

Thanks,
Aakash.


Which Py4J version goes with Spark 2.3.1?

2018-08-29 Thread Aakash Basu
Hi,

Which Py4J version goes with Spark 2.3.1? I have py4j-0.10.7 but throws an
error because of certain compatibility issues with the Spark 2.3.1.

Error:

[2018-08-29] [06:46:56] [ERROR] - Traceback (most recent call last): File
"", line 120, in run File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 441, in csv return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__ answer, self.gateway_client, self.target_id,
self.name) File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 63, in deco return f(*a, **kw) File
"/data/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
line 328, in get_return_value format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o49.csv.

Any help?

Thanks,
Aakash.


RDD Collect Issue

2018-08-28 Thread Aakash Basu
Hi,

I configured a new system, spark 2.3.0, python 3.6.0, dataframe read and
other operations working as expected.

But, RDD collect is failing -

distFile = 
spark.sparkContext.textFile("/Users/aakash/Documents/Final_HOME_ORIGINAL/Downloads/PreloadedDataset/breast-cancer-wisconsin.csv")

distFile.collect()


Error:* py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.*

Traceback:

Traceback (most recent call last):
  File
"/Users/aakash/Documents/Final_HOME_ORIGINAL/PycharmProjects/AllMyRnD/BB_AutoML_Blocks/Test.py",
line 15, in 
distFile.collect()
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 824, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/sql/utils.py",
line 63, in deco
return f(*a, **kw)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/protocol.py",
line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.(Unknown Source)
at org.apache.xbean.asm5.ClassReader.(Unknown Source)
at org.apache.xbean.asm5.ClassReader.(Unknown Source)
at
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
at
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at
org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2066)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at
org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)


I followed this solution for similar problem, (
https://stackoverflow.com/questions/49167656/error-while-running-collect-in-pyspark)
installed latest Java, but still of no use.

What to do?



How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

2018-08-14 Thread Aakash Basu
Hi all,

The requirement is, to process file using Spark Streaming fed from Kafka
Topic and once all the transformations are done, make it a batch of static
dataframe and pass it into a Spark ML Model tuning.

As of now, I had been doing it in the below fashion -

1) Read the file using Kafka
2) Consume it in Spark using a streaming dataframe
3) Run spark transformation jobs on streaming data
4) Append and write on HDFS.
5) Read the transformed file as batch in Spark
6) Run Spark ML Model

But, the requirement is to avoid use of HDFS as it may not be installed in
certain clusters, so, we've to avoid the disk I/O and do it on the fly from
Kafka to append in a spark static DF and hence pass that DF to the ML Model.

How to go about it?

Thanks,
Aakash.


Accessing a dataframe from another Singleton class (Python)

2018-08-13 Thread Aakash Basu
Hi,

I wanted to read a dataframe in one singleton class and and use that in
another singleton class.

Below is my code -

Class Singleton -

class Singleton(object):
  _instances = {}
  def __new__(class_, *args, **kwargs):
if class_ not in class_._instances:
class_._instances[class_] = super(Singleton,
class_).__new__(class_, *args, **kwargs)
return class_._instances[class_]


Class A -


from experiment.Singleton import Singleton

class ReadData(Singleton):

data = None
test_field=None

def run(self):
self.data = self.spark.read.option("header", "true").csv(data_path)

self.test_field = 234


Class B -


from experiment.Singleton import Singleton

class Clustering(Singleton):

data = None
test_field=None

def run(self):
read_class_object = ReadData()

number = read_class_object.test_field

print(number)
X = read_class_object.data
print('Showing the DATA READ at Clustering block - ', X)

X.show()


from test_df_bb.classA import ReadData
from test_df_bb.classB import Clustering

obj1 = ReadData()
obj2 = Clustering()

obj1.init()
obj1.run()

obj2.init()
obj2.run()


In the second class, the normal pythonic values like string and integers
are working but the dataframe is coming as None.

Is there any ideal way to call another class and get a dataframe it read
and do operations on several classes? Please help!


Thanks,
Aakash.


Re: How to do PCA with Spark Streaming Dataframe?

2018-07-31 Thread Aakash Basu
FYI

The relevant StackOverflow query on the same -
https://stackoverflow.com/questions/51610482/how-to-do-pca-with-spark-streaming-dataframe

On Tue, Jul 31, 2018 at 3:18 PM, Aakash Basu 
wrote:

> Hi,
>
> Just curious to know, how can we run a Principal Component Analysis on
> streaming data in distributed mode? If we can, is it mathematically valid
> enough?
>
> Have anyone done that before? Can you guys share your experience over it?
> Is there any API Spark provides to do the same on Spark Streaming mode?
>
> Thanks,
> Aakash.
>


How to do PCA with Spark Streaming Dataframe?

2018-07-31 Thread Aakash Basu
Hi,

Just curious to know, how can we run a Principal Component Analysis on
streaming data in distributed mode? If we can, is it mathematically valid
enough?

Have anyone done that before? Can you guys share your experience over it?
Is there any API Spark provides to do the same on Spark Streaming mode?

Thanks,
Aakash.


Re: Query on Profiling Spark Code

2018-07-31 Thread Aakash Basu
Okay, sure!

On Tue, Jul 31, 2018 at 1:06 PM, Patil, Prashasth <
prashasth.pa...@spglobal.com> wrote:

> Hi Aakash,
>
> On a related note, you may want to try SparkLens for profiling which is
> quite helpful in my opinion.
>
>
>
>
>
> -Prash
>
>
>
> *From:* Aakash Basu [mailto:aakash.spark@gmail.com]
> *Sent:* Tuesday, July 17, 2018 12:41 PM
> *To:* user
> *Subject:* Query on Profiling Spark Code
>
>
>
> Hi guys,
>
>
>
> I'm trying to profile my Spark code on cProfiler and check where more time
> is taken. I found the most time taken is by some socket object, which I'm
> quite clueless of, as to where it is used.
>
>
>
> Can anyone shed some light on this?
>
>
>
>
>
> *ncalls*
>
> *tottime*
>
> *percall*
>
> *cumtime*
>
> *percall*
>
> *filename:lineno(function)*
>
> 11789
>
> 479.8
>
> 0.0407
>
> 479.8
>
> 0.0407
>
> ~:0()
>
>
>
>
>
> Thanks,
>
> Aakash.
>
> --
>
> The information contained in this message is intended only for the
> recipient, and may be a confidential attorney-client communication or may
> otherwise be privileged and confidential and protected from disclosure. If
> the reader of this message is not the intended recipient, or an employee or
> agent responsible for delivering this message to the intended recipient,
> please be aware that any dissemination or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please immediately notify us by replying to the message and deleting it
> from your computer. S Global Inc. reserves the right, subject to
> applicable local law, to monitor, review and process the content of any
> electronic message or information sent to or from S Global Inc. e-mail
> addresses without informing the sender or recipient of the message. By
> sending electronic message or information to S Global Inc. e-mail
> addresses you, as the sender, are consenting to S Global Inc. processing
> any of your personal data therein.
>


Query on Profiling Spark Code

2018-07-17 Thread Aakash Basu
Hi guys,

I'm trying to profile my Spark code on cProfiler and check where more time
is taken. I found the most time taken is by some socket object, which I'm
quite clueless of, as to where it is used.

Can anyone shed some light on this?


ncallstottimepercallcumtimepercallfilename:lineno(function)
11789 479.8 0.0407 479.8 0.0407 ~:0()


Thanks,
Aakash.


Re: Inferring Data driven Spark parameters

2018-07-04 Thread Aakash Basu
I do not want to change executor/driver cores/memory on the fly in a single
Spark job, all I want is to make them cluster specific. So, I want to have
a formulae, with which, depending on the size of driver and executor
details, I can find out the values for them before submitting those details
in the spark-submit.

I, more or less know how to achieve the above as I've previously done that.

All I need to do is, I want to tweak the other spark confs depending on the
data. Is that possible? I mean (just an example), if I have 100+ features,
I want to double my default spark.driver.maxResultSize to 2G, and similarly
for other configs. Can that be achieved by any means for a optimal run on
that kind of dataset? If yes, can I?

On Tue, Jul 3, 2018 at 6:28 PM, Vadim Semenov  wrote:

> You can't change the executor/driver cores/memory on the fly once
> you've already started a Spark Context.
> On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu 
> wrote:
> >
> > We aren't using Oozie or similar, moreover, the end to end job shall be
> exactly the same, but the data will be extremely different (number of
> continuous and categorical columns, vertical size, horizontal size, etc),
> hence, if there would have been a calculation of the parameters to arrive
> at a conclusion that we can simply get the data and derive the respective
> configuration/parameters, it would be great.
> >
> > On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke 
> wrote:
> >>
> >> Don’t do this in your job. Create for different types of jobs different
> jobs and orchestrate them using oozie or similar.
> >>
> >> On 3. Jul 2018, at 09:34, Aakash Basu 
> wrote:
> >>
> >> Hi,
> >>
> >> Cluster - 5 node (1 Driver and 4 workers)
> >> Driver Config: 16 cores, 32 GB RAM
> >> Worker Config: 8 cores, 16 GB RAM
> >>
> >> I'm using the below parameters from which I know the first chunk is
> cluster dependent and the second chunk is data/code dependent.
> >>
> >> --num-executors 4
> >> --executor-cores 5
> >> --executor-memory 10G
> >> --driver-cores 5
> >> --driver-memory 25G
> >>
> >>
> >> --conf spark.sql.shuffle.partitions=100
> >> --conf spark.driver.maxResultSize=2G
> >> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
> >> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
> >>
> >> I've come upto these values depending on my R on the properties and
> the issues I faced and hence the handles.
> >>
> >> My ask here is -
> >>
> >> 1) How can I infer, using some formula or a code, to calculate the
> below chunk dependent on the data/code?
> >> 2) What are the other usable properties/configurations which I can use
> to shorten my job runtime?
> >>
> >> Thanks,
> >> Aakash.
> >
> >
>
>
> --
> Sent from my iPhone
>


Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Aakash Basu
Thanks a ton!

On Tue, Jul 3, 2018 at 6:26 PM, Vadim Semenov  wrote:

> As typical `JAVA_OPTS` you need to pass as a single parameter:
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB"
>
> Also you got an extra space in the parameter, there should be no space
> after the colon symbol
> On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu 
> wrote:
> >
> > Hi,
> >
> > I used the below in the Spark Submit for using G1GC -
> >
> > --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
> >
> > Now, I want to use -XX: -ResizePLAB of the G1GC to control to avoid
> performance degradation caused by a large number of thread communications.
> >
> > How to do it? I tried submitting in the similar fashion -
> >
> > --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
> "spark.executor.extraJavaOptions=-XX: -ResizePLAB", but it doesn't work.
> >
> > Thanks,
> > Aakash.
>
>
>
> --
> Sent from my iPhone
>


Re: Inferring Data driven Spark parameters

2018-07-03 Thread Aakash Basu
We aren't using Oozie or similar, moreover, the end to end job shall be
exactly the same, but the data will be extremely different (number of
continuous and categorical columns, vertical size, horizontal size, etc),
hence, if there would have been a calculation of the parameters to arrive
at a conclusion that we can simply get the data and derive the respective
configuration/parameters, it would be great.

On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke  wrote:

> Don’t do this in your job. Create for different types of jobs different
> jobs and orchestrate them using oozie or similar.
>
> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
>
> Hi,
>
> Cluster - 5 node (1 Driver and 4 workers)
> Driver Config: 16 cores, 32 GB RAM
> Worker Config: 8 cores, 16 GB RAM
>
> I'm using the below parameters from which I know the first chunk is
> cluster dependent and the second chunk is data/code dependent.
>
> --num-executors 4
> --executor-cores 5
> --executor-memory 10G
> --driver-cores 5
> --driver-memory 25G
>
>
> --conf spark.sql.shuffle.partitions=100
> --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>
> I've come upto these values depending on my R on the properties and the
> issues I faced and hence the handles.
>
> My ask here is -
>
> *1) How can I infer, using some formula or a code, to calculate the below
> chunk dependent on the data/code?*
> *2) What are the other usable properties/configurations which I can use to
> shorten my job runtime?*
>
> Thanks,
> Aakash.
>
>


Inferring Data driven Spark parameters

2018-07-03 Thread Aakash Basu
Hi,

Cluster - 5 node (1 Driver and 4 workers)
Driver Config: 16 cores, 32 GB RAM
Worker Config: 8 cores, 16 GB RAM

I'm using the below parameters from which I know the first chunk is cluster
dependent and the second chunk is data/code dependent.

--num-executors 4
--executor-cores 5
--executor-memory 10G
--driver-cores 5
--driver-memory 25G


--conf spark.sql.shuffle.partitions=100
--conf spark.driver.maxResultSize=2G
--conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
--conf spark.scheduler.listenerbus.eventqueue.capacity=2

I've come upto these values depending on my R on the properties and the
issues I faced and hence the handles.

My ask here is -

*1) How can I infer, using some formula or a code, to calculate the below
chunk dependent on the data/code?*
*2) What are the other usable properties/configurations which I can use to
shorten my job runtime?*

Thanks,
Aakash.


[G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Aakash Basu
Hi,

I used the below in the Spark Submit for using G1GC -

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"

Now, I want to use *-XX: -ResizePLAB *of the G1GC to control to avoid
performance degradation caused by a large number of thread communications.

How to do it? I tried submitting in the similar fashion -

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
"spark.executor.extraJavaOptions=*-XX: -ResizePLAB*", but it doesn't work.

Thanks,
Aakash.


Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-20 Thread Aakash Basu
Hi Kazuaki,

It would be really difficult to produce a small S-A code to reproduce this
problem because, I'm running through a big pipeline of feature engineering
where I derive a lot of variables based on the present ones to kind of
explode the size of the table by many folds. Then, when I do any kind of
join, this error shoots up.

I tried with wholeStage.codegen=false, but that errors out the entire
program rather than running it with a lesser optimized code.

Any suggestion on how I can proceed towards a JIRA entry for this?

Thanks,
Aakash.

On Wed, Jun 20, 2018 at 9:41 PM, Kazuaki Ishizaki 
wrote:

> Spark 2.3 tried to split a large generated Java methods into small methods
> as possible. However, this report may remain places that generates a large
> method.
>
> Would it be possible to create a JIRA entry with a small stand alone
> program that can reproduce this problem? It would be very helpful that the
> community will address this problem.
>
> Best regards,
> Kazuaki Ishizaki
>
>
>
> From:vaquar khan 
> To:Eyal Zituny 
> Cc:Aakash Basu , user <
> user@spark.apache.org>
> Date:2018/06/18 01:57
> Subject:Re: [Help] Codegen Stage grows beyond 64 KB
> --
>
>
>
> Totally agreed with Eyal .
>
> The problem is that when Java programs generated using Catalyst from
> programs using DataFrame and Dataset are compiled into Java bytecode, the
> size of byte code of one method must not be 64 KB or more, This conflicts
> with the limitation of the Java class file, which is an exception that
> occurs.
>
> In order to avoid occurrence of an exception due to this restriction,
> within Spark, a solution is to split the methods that compile and make Java
> bytecode that is likely to be over 64 KB into multiple methods when
> Catalyst generates Java programs It has been done.
>
> Use persist or any other logical separation in pipeline.
>
> Regards,
> Vaquar khan
>
> On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny <*eyal.zit...@equalum.io*
> > wrote:
> Hi Akash,
> such errors might appear in large spark pipelines, the root cause is a
> 64kb jvm limitation.
> the reason that your job isn't failing at the end is due to spark fallback
> - if code gen is failing, spark compiler will try to create the flow
> without the code gen (less optimized)
> if you do not want to see this error, you can either disable code gen
> using the flag:  spark.sql.codegen.wholeStage= "false"
> or you can try to split your complex pipeline into several spark flows if
> possible
>
> hope that helps
>
> Eyal
>
> On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu <*aakash.spark@gmail.com*
> > wrote:
> Hi,
>
> I already went through it, that's one use case. I've a complex and very
> big pipeline of multiple jobs under one spark session. Not getting, on how
> to solve this, as it is happening over Logistic Regression and Random
> Forest models, which I'm just using from Spark ML package rather than doing
> anything by myself.
>
> Thanks,
> Aakash.
>
> On Sun 17 Jun, 2018, 8:21 AM vaquar khan, <*vaquar.k...@gmail.com*
> > wrote:
> Hi Akash,
>
> Please check stackoverflow.
>
>
> *https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe*
> <https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe>
>
> Regards,
> Vaquar khan
>
> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu <*aakash.spark@gmail.com*
> > wrote:
> Hi guys,
>
> I'm getting an error when I'm feature engineering on 30+ columns to create
> about 200+ columns. It is not failing the job, but the ERROR shows. I want
> to know how can I avoid this.
>
> Spark - 2.3.1
> Python - 3.6
>
> Cluster Config -
> 1 Master - 32 GB RAM, 16 Cores
> 4 Slaves - 16 GB RAM, 8 Cores
>
>
> Input data - 8 partitions of parquet file with snappy compression.
>
> My Spark-Submit -> spark-submit --master spark://*192.168.60.20:7077*
> <http://192.168.60.20:7077>--num-executors 4 --executor-cores 5
> --executor-memory 10G --driver-cores 5 --driver-memory 25G --conf
> spark.sql.shuffle.partitions=60 --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
> spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
> spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py
> > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>
>
> Stack-Trace below -
>
> ERROR CodeGenerator:91 - failed to compile: 
> o

Way to avoid CollectAsMap in RandomForest

2018-06-20 Thread Aakash Basu
Hi,

I'm running RandomForest model from Spark ML API on a medium sized data
(2.25 million rows and 60 features), most of my time goes in the
CollectAsMap of RandomForest but I've no option to avoid it as it is in the
API.

Is there a way to cutshort my end to end runtime?



Thanks,
Aakash.


G1GC vs ParallelGC

2018-06-20 Thread Aakash Basu
Hi guys,

I just wanted to know, why my ParallelGC (*--conf
"spark.executor.extraJavaOptions=-XX:+UseParallelGC"*) in a very long Spark
ML Pipeline works faster than when I set G1GC (*--conf
"spark.executor.extraJavaOptions=-XX:+UseG1GC"*), even though the Spark
community suggests G1GC to be much better than the ParallelGC.

Any pointers?

Thanks,
Aakash.


Re: Best way to process this dataset

2018-06-19 Thread Aakash Basu
Georg, just asking, can Pandas handle such a big dataset? If that data is
further passed into using any of the sklearn modules?

On Tue, Jun 19, 2018 at 10:35 AM, Georg Heiler 
wrote:

> use pandas or dask
>
> If you do want to use spark store the dataset as parquet / orc. And then
> continue to perform analytical queries on that dataset.
>
> Raymond Xie  schrieb am Di., 19. Juni 2018 um
> 04:29 Uhr:
>
>> I have a 3.6GB csv dataset (4 columns, 100,150,807 rows), my environment
>> is 20GB ssd harddisk and 2GB RAM.
>>
>> The dataset comes with
>> User ID: 987,994
>> Item ID: 4,162,024
>> Category ID: 9,439
>> Behavior type ('pv', 'buy', 'cart', 'fav')
>> Unix Timestamp: span between November 25 to December 03, 2017
>>
>> I would like to hear any suggestion from you on how should I process the
>> dataset with my current environment.
>>
>> Thank you.
>>
>> **
>> *Sincerely yours,*
>>
>>
>> *Raymond*
>>
>


Fwd: StackOverFlow ERROR - Bulk interaction for many columns fail

2018-06-18 Thread Aakash Basu
*Correction, 60C2 * 3*


-- Forwarded message --
From: Aakash Basu 
Date: Mon, Jun 18, 2018 at 4:15 PM
Subject: StackOverFlow ERROR - Bulk interaction for many columns fail
To: user 


Hi,

When doing bulk interaction on around 60 columns, I want 3 columns to be
created out of each one of them, since it has a combination of 3, then it
becomes 60C2 * 3, which creates a lot of columns.

So, for a lesser than 50 - 60 columns, even though it takes time, it still
works fine, but, for a little larger number of columns, it throws this
error -

  File "/usr/local/lib/python3.5/dist-packages/backend/feature_
> extraction/cont_bulk_interactions.py", line 100, in bulktransformer_pairs
> df = df.withColumn(col_name, each_op(df[var_2], df[var_1]))
>   File 
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 1849, in withColumn
>   File "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.
> 10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
>   File 
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco
>   File "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.
> 10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o26887.withColumn.
> : java.lang.StackOverflowError
> at scala.collection.generic.GenericTraversableTemplate$
> class.genericBuilder(GenericTraversableTemplate.scala:70)
> at scala.collection.AbstractTraversable.genericBuilder(Traversable.
> scala:104)
> at scala.collection.generic.GenTraversableFactory$
> GenericCanBuildFrom.apply(GenTraversableFactory.scala:57)
> at scala.collection.generic.GenTraversableFactory$
> GenericCanBuildFrom.apply(GenTraversableFactory.scala:52)
> at scala.collection.TraversableLike$class.builder$
> 1(TraversableLike.scala:229)
> at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:233)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>

What to do?

Thanks,
Aakash.


StackOverFlow ERROR - Bulk interaction for many columns fail

2018-06-18 Thread Aakash Basu
Hi,

When doing bulk interaction on around 60 columns, I want 3 columns to be
created out of each one of them, since it has a combination of 3, then it
becomes 60N2 * 3, which creates a lot of columns.

So, for a lesser than 50 - 60 columns, even though it takes time, it still
works fine, but, for a little larger number of columns, it throws this
error -

  File
> "/usr/local/lib/python3.5/dist-packages/backend/feature_extraction/cont_bulk_interactions.py",
> line 100, in bulktransformer_pairs
> df = df.withColumn(col_name, each_op(df[var_2], df[var_1]))
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
> line 1849, in withColumn
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 63, in deco
>   File
> "/appdata/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o26887.withColumn.
> : java.lang.StackOverflowError
> at
> scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:70)
> at
> scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:104)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:57)
> at
> scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:52)
> at
> scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:229)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>

What to do?

Thanks,
Aakash.


Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-16 Thread Aakash Basu
Hi,

I already went through it, that's one use case. I've a complex and very big
pipeline of multiple jobs under one spark session. Not getting, on how to
solve this, as it is happening over Logistic Regression and Random Forest
models, which I'm just using from Spark ML package rather than doing
anything by myself.

Thanks,
Aakash.

On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:

> Hi Akash,
>
> Please check stackoverflow.
>
>
> https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe
>
> Regards,
> Vaquar khan
>
> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu 
> wrote:
>
>> Hi guys,
>>
>> I'm getting an error when I'm feature engineering on 30+ columns to
>> create about 200+ columns. It is not failing the job, but the ERROR shows.
>> I want to know how can I avoid this.
>>
>> Spark - 2.3.1
>> Python - 3.6
>>
>> Cluster Config -
>> 1 Master - 32 GB RAM, 16 Cores
>> 4 Slaves - 16 GB RAM, 8 Cores
>>
>>
>> Input data - 8 partitions of parquet file with snappy compression.
>>
>> My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
>> --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
>> --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
>> spark.driver.maxResultSize=2G --conf
>> "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
>> spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
>> spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py >
>> /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>>
>> Stack-Trace below -
>>
>> ERROR CodeGenerator:91 - failed to compile:
>>> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
>>> Code of method "processNext()V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
>>> grows beyond 64 KB
>>> org.codehaus.janino.InternalCompilerException: Compiling
>>> "GeneratedClass": Code of method "processNext()V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
>>> grows beyond 64 KB
>>> at
>>> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
>>> at
>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
>>> at
>>> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
>>> at
>>> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>>> at
>>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>

[Help] Codegen Stage grows beyond 64 KB

2018-06-16 Thread Aakash Basu
Hi guys,

I'm getting an error when I'm feature engineering on 30+ columns to create
about 200+ columns. It is not failing the job, but the ERROR shows. I want
to know how can I avoid this.

Spark - 2.3.1
Python - 3.6

Cluster Config -
1 Master - 32 GB RAM, 16 Cores
4 Slaves - 16 GB RAM, 8 Cores


Input data - 8 partitions of parquet file with snappy compression.

My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
--num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
--driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
spark.driver.maxResultSize=2G --conf
"spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py >
/appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt

Stack-Trace below -

ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
> Code of method "processNext()V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
> grows beyond 64 KB
> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
> Code of method "processNext()V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
> grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
> at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
> at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
> at
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> at
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> at
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
> at
> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> at
> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> 

Understanding Event Timeline of Spark UI

2018-06-15 Thread Aakash Basu
Hi,

I've a job running which shows the Event Timeline as follows, I am trying
to guess the gaps between these single lines, they seem to be parallel but
not immediately sequential with other stages.

Any other insight from this, and what is the cluster doing during these
gaps?




Thanks,
Aakash.


Issue upgrading to Spark 2.3.1 (Maintenance Release)

2018-06-14 Thread Aakash Basu
Hi,

Downloaded the latest Spark version because the of the fix for "ERROR
AsyncEventQueue:70 - Dropping event from queue appStatus."

After setting environment variables and running the same code in PyCharm,
I'm getting this error, which I can't find a solution of.

Exception in thread "main" java.util.NoSuchElementException: key not found:
_PYSPARK_DRIVER_CONN_INFO_PATH
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at
org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:64)
at
org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Any help?

Thanks,
Aakash.


Re: Using G1GC in Spark

2018-06-14 Thread Aakash Basu
Thanks a lot Srinath, for your perpetual help.

On Thu, Jun 14, 2018 at 5:49 PM, Srinath C  wrote:

> You'll have to use "spark.executor.extraJavaOptions" configuration
> parameter:
> See documentation link
> <https://spark.apache.org/docs/latest/configuration.html#runtime-environment>
> .
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
>
> Regards,
> Srinath.
>
>
> On Thu, Jun 14, 2018 at 4:44 PM Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I am trying to spark submit with G1GC for garbage collection, but it
>> isn't working.
>>
>> What is the way to deploy a spark job with G1GC?
>>
>> Tried -
>>
>> *spark-submit --master spark://192.168.60.20:7077
>> <http://192.168.60.20:7077> --conf -XX:+UseG1GC
>> /appdata/bblite-codebase/test.py*
>>
>> Didn't work.
>>
>> Tried -
>>
>> *spark-submit --master spark://192.168.60.20:7077
>> <http://192.168.60.20:7077> -XX:+UseG1GC /appdata/bblite-codebase/test.py*
>>
>> Still didn't work. Would appreciate a help on this.
>>
>> Thanks,
>> Aakash.
>>
>


Using G1GC in Spark

2018-06-14 Thread Aakash Basu
Hi,

I am trying to spark submit with G1GC for garbage collection, but it isn't
working.

What is the way to deploy a spark job with G1GC?

Tried -

*spark-submit --master spark://192.168.60.20:7077
 --conf -XX:+UseG1GC
/appdata/bblite-codebase/test.py*

Didn't work.

Tried -

*spark-submit --master spark://192.168.60.20:7077
 -XX:+UseG1GC /appdata/bblite-codebase/test.py*

Still didn't work. Would appreciate a help on this.

Thanks,
Aakash.


Crosstab/AproxQuantile Performance on Spark Cluster

2018-06-14 Thread Aakash Basu
Hi all,

Is the Event Timeline representing a good shape? I mean at a point, to
calculate WoE columns on categorical variables, I am having to do crosstab
on each column, and on a cluster of 4 nodes, it is taking time as I've 230+
columns and 60,000 rows. How to make it more performant?





Thanks,
Aakash.


Inferring from Event Timeline

2018-06-13 Thread Aakash Basu
Hi guys,

What all can be inferred by closely watching an event time-line in Spark
UI? I generally monitor the tasks taking more time and also how much in
parallel they're spinning.

What else?

Eg Event Time-line from Spark UI:


Thanks,
Aakash.


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-12 Thread Aakash Basu
Hi Srinath,

Thanks for such an elaborate reply. How to reduce the number of overall
tasks?

I found, after simply repartitioning the csv file into 8 parts and
converting it to parquet with snappy compression, helped not only in even
distribution of the tasks on all nodes, but also helped in bringing the end
to end job timing down to approx 0.8X of the prior run.

Query - Check if there are too many partitions in the RDD and tune it using
spark.sql.shuffle.partitions. How to do this? Because I have a huge
pipeline of memory and CPU intensive operations, which will ideally have
innumerable spark transformations. At which level should I apply the same?
My total tasks of an average dataset is going to around 2 millions
(approx), is it a bad show? How can I control? Do I need to re-factor my
entire Pipeline (series of codes) then?

Below is the new executors show while the updated run is taking place -




Thanks,
Aakash.

On Tue, Jun 12, 2018 at 2:14 PM, Srinath C  wrote:

> Hi Aakash,
>
> Can you check the logs for Executor ID 0? It was restarted on worker
> 192.168.49.39 perhaps due to OOM or something.
>
> Also observed that the number of tasks are high and unevenly distributed
> across the workers.
> Check if there are too many partitions in the RDD and tune it using
> spark.sql.shuffle.partitions.
> If the uneven distribution is still occurring then try repartitioning the
> data set using appropriate fields.
>
> Hope that helps.
> Regards,
> Srinath.
>
>
> On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu 
> wrote:
>
>> Yes, but when I did increase my executor memory, the spark job is going
>> to halt after running a few steps, even though, the executor isn't dying.
>>
>> Data - 60,000 data-points, 230 columns (60 MB data).
>>
>> Any input on why it behaves like that?
>>
>> On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla 
>> wrote:
>>
>>> Aakash,
>>>
>>> Like Jorn suggested, did you increase your test data set? If so, did you
>>> also update your executor-memory setting? It seems like you might exceeding
>>> the executor memory threshold.
>>>
>>> Thanks
>>> Vamshi Talla
>>>
>>> Sent from my iPhone
>>>
>>> On Jun 11, 2018, at 8:54 AM, Aakash Basu 
>>> wrote:
>>>
>>> Hi Jorn/Others,
>>>
>>> Thanks for your help. Now, data is being distributed in a proper way,
>>> but the challenge is, after a certain point, I'm getting this error, after
>>> which, everything stops moving ahead -
>>>
>>> 2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on
>>> 192.168.49.39
>>> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.39=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=T0QyzG2Sufk0kktKK3U2BVsAszvhCzx%2FFNnXOxpiWPs%3D=0>:
>>> Remote RPC client disassociated. Likely due to containers exceeding
>>> thresholds, or network issues. Check driver logs for WARN messages.
>>>
>>> 
>>>
>>> How to avoid this scenario?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke 
>>> wrote:
>>>
>>>> If it is in kB then spark will always schedule it to one node. As soon
>>>> as it gets bigger you will see usage of more nodes.
>>>>
>>>> Hence increase your testing Dataset .
>>>>
>>>> On 11. Jun 2018, at 12:22, Aakash Basu 
>>>> wrote:
>>>>
>>>> Jorn - The code is a series of feature engineering and model tuning
>>>> operations. Too big to show. Yes, data volume is too low, it is in KBs,
>>>> just tried to experiment with a small dataset before going for a large one.
>>>>
>>>> Akshay - I ran with your suggested spark configurations, I get this
>>>> (the node changed, but the problem persists) -
>>>>
>>>> 
>>>>
>>>>
>>>>
>>>> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu 
>>>> wrote:
>>>>
>>>>> try
>>>>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf
>>>>> spark.scheduler.mode=FAIR
>>>>>
>>>>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <
>>>>> aakash.spark@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have submitted a job on* 4 node cluster*, where I see, most of the
>>>>>> operations happening at one of the worker nodes and other two are simply
>>>>>> chilling out.
>>>>>>
>>>>>> Picture below puts light on that -
>>>>>>
>>>>>> How to properly distribute the load?
>>>>>>
>>>>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>>>>>
>>>>>> *Cores - 6*
>>>>>> *RAM - 12 GB*
>>>>>> *HDD - 60 GB*
>>>>>>
>>>>>> My Spark Submit command is as follows -
>>>>>>
>>>>>> *spark-submit --master spark://192.168.49.37:7077
>>>>>> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.37%3A7077=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=wS4drWE7%2FAJFXoUL3w0OzIRNL54RLKRTeMUBB%2BY1B28%3D=0>
>>>>>> --num-executors 3 --executor-cores 5 --executor-memory 4G
>>>>>> /appdata/bblite-codebase/prima_diabetes_indians.py*
>>>>>>
>>>>>> What to do?
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-12 Thread Aakash Basu
Yes, but when I did increase my executor memory, the spark job is going to
halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla  wrote:

> Aakash,
>
> Like Jorn suggested, did you increase your test data set? If so, did you
> also update your executor-memory setting? It seems like you might exceeding
> the executor memory threshold.
>
> Thanks
> Vamshi Talla
>
> Sent from my iPhone
>
> On Jun 11, 2018, at 8:54 AM, Aakash Basu 
> wrote:
>
> Hi Jorn/Others,
>
> Thanks for your help. Now, data is being distributed in a proper way, but
> the challenge is, after a certain point, I'm getting this error, after
> which, everything stops moving ahead -
>
> 2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on
> 192.168.49.39
> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.39=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=T0QyzG2Sufk0kktKK3U2BVsAszvhCzx%2FFNnXOxpiWPs%3D=0>:
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages.
>
> 
>
> How to avoid this scenario?
>
> Thanks,
> Aakash.
>
> On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke  wrote:
>
>> If it is in kB then spark will always schedule it to one node. As soon as
>> it gets bigger you will see usage of more nodes.
>>
>> Hence increase your testing Dataset .
>>
>> On 11. Jun 2018, at 12:22, Aakash Basu 
>> wrote:
>>
>> Jorn - The code is a series of feature engineering and model tuning
>> operations. Too big to show. Yes, data volume is too low, it is in KBs,
>> just tried to experiment with a small dataset before going for a large one.
>>
>> Akshay - I ran with your suggested spark configurations, I get this (the
>> node changed, but the problem persists) -
>>
>> 
>>
>>
>>
>> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu 
>> wrote:
>>
>>> try
>>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf
>>> spark.scheduler.mode=FAIR
>>>
>>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu >> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I have submitted a job on* 4 node cluster*, where I see, most of the
>>>> operations happening at one of the worker nodes and other two are simply
>>>> chilling out.
>>>>
>>>> Picture below puts light on that -
>>>>
>>>> How to properly distribute the load?
>>>>
>>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>>>
>>>> *Cores - 6*
>>>> *RAM - 12 GB*
>>>> *HDD - 60 GB*
>>>>
>>>> My Spark Submit command is as follows -
>>>>
>>>> *spark-submit --master spark://192.168.49.37:7077
>>>> <https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2F192.168.49.37%3A7077=02%7C01%7C%7Cdc9886e0d4be43fdf0cb08d5cf9a6fda%7C84df9e7fe9f640afb435%7C1%7C0%7C636643184560233393=wS4drWE7%2FAJFXoUL3w0OzIRNL54RLKRTeMUBB%2BY1B28%3D=0>
>>>> --num-executors 3 --executor-cores 5 --executor-memory 4G
>>>> /appdata/bblite-codebase/prima_diabetes_indians.py*
>>>>
>>>> What to do?
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>
>>>
>>
>


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-11 Thread Aakash Basu
Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but
the challenge is, after a certain point, I'm getting this error, after
which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on
192.168.49.39: Remote RPC client disassociated. Likely due to containers
exceeding thresholds, or network issues. Check driver logs for WARN
messages.



How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke  wrote:

> If it is in kB then spark will always schedule it to one node. As soon as
> it gets bigger you will see usage of more nodes.
>
> Hence increase your testing Dataset .
>
> On 11. Jun 2018, at 12:22, Aakash Basu  wrote:
>
> Jorn - The code is a series of feature engineering and model tuning
> operations. Too big to show. Yes, data volume is too low, it is in KBs,
> just tried to experiment with a small dataset before going for a large one.
>
> Akshay - I ran with your suggested spark configurations, I get this (the
> node changed, but the problem persists) -
>
> 
>
>
>
> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu 
> wrote:
>
>> try
>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf
>> spark.scheduler.mode=FAIR
>>
>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu 
>> wrote:
>>
>>> Hi,
>>>
>>> I have submitted a job on* 4 node cluster*, where I see, most of the
>>> operations happening at one of the worker nodes and other two are simply
>>> chilling out.
>>>
>>> Picture below puts light on that -
>>>
>>> How to properly distribute the load?
>>>
>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>>
>>> *Cores - 6*
>>> *RAM - 12 GB*
>>> *HDD - 60 GB*
>>>
>>> My Spark Submit command is as follows -
>>>
>>> *spark-submit --master spark://192.168.49.37:7077
>>> <http://192.168.49.37:7077> --num-executors 3 --executor-cores 5
>>> --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py*
>>>
>>> What to do?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>>
>


[Spark Optimization] Why is one node getting all the pressure?

2018-06-11 Thread Aakash Basu
Hi,

I have submitted a job on* 4 node cluster*, where I see, most of the
operations happening at one of the worker nodes and other two are simply
chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

*Cores - 6*
*RAM - 12 GB*
*HDD - 60 GB*

My Spark Submit command is as follows -

*spark-submit --master spark://192.168.49.37:7077
 --num-executors 3 --executor-cores 5
--executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py*

What to do?

Thanks,
Aakash.


Re: Spark YARN job submission error (code 13)

2018-06-08 Thread Aakash Basu
Fixed by adding 2 configurations in yarn-site,xml.

Thanks all!

On Fri, Jun 8, 2018 at 2:44 PM, Aakash Basu 
wrote:

> Hi,
>
> I fixed that problem by putting all the Spark JARS in spark-archive.zip
> and putting it in the HDFS (as that problem was happening for that reason) -
>
> But, I'm facing a new issue now, this is the new RPC error I get
> (Stack-Trace below) -
>
>
>
>
> 2018-06-08 14:26:43 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Running Spark version 2.3.0
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Submitted application:
> EndToEnd_FeatureEngineeringPipeline
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - SecurityManager:
> authentication disabled; ui acls disabled; users  with view permissions:
> Set(bblite); groups with view permissions: Set(); users  with modify
> permissions: Set(bblite); groups with modify permissions: Set()
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'sparkDriver' on port 41957.
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering MapOutputTracker
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering BlockManagerMaster
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 - Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 -
> BlockManagerMasterEndpoint up
> 2018-06-08 14:26:45 INFO  DiskBlockManager:54 - Created local directory at
> /appdata/spark/tmp/blockmgr-7b035871-a1f7-47ff-aad8-f7a43367836e
> 2018-06-08 14:26:45 INFO  MemoryStore:54 - MemoryStore started with
> capacity 366.3 MB
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
> 2018-06-08 14:26:45 INFO  log:192 - Logging initialized @3659ms
> 2018-06-08 14:26:45 INFO  Server:346 - jetty-9.3.z-SNAPSHOT
> 2018-06-08 14:26:45 INFO  Server:414 - Started @3733ms
> 2018-06-08 14:26:45 INFO  AbstractConnector:278 - Started
> ServerConnector@3080efb7{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'SparkUI' on port 4040.
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@2c3409b5{/jobs,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7f1ba569{/jobs/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@493631a1{/jobs/job,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@6b12f33c{/jobs/job/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@490023da{/stages,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@31c3a862{/stages/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@4da2454f{/stages/stage,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@552f182d{/stages/stage/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@a78a7fa{/stages/pool,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@15142105{/stages/pool/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7589c977{/storage,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@584a599b{/storage/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1742621f{/storage/rdd,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@23ea75fb{/storage/rdd/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1813d280{/environment,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@129fc698{/environment/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  

Re: Spark YARN Error - triggering spark-shell

2018-06-08 Thread Aakash Basu
Fixed by adding 2 configurations in yarn-site,xml.

Thanks all!


On Fri, Jun 8, 2018 at 2:44 PM, Aakash Basu 
wrote:

> Hi,
>
> I fixed that problem by putting all the Spark JARS in spark-archive.zip
> and putting it in the HDFS (as that problem was happening for that reason) -
>
> But, I'm facing a new issue now, this is the new RPC error I get
> (Stack-Trace below) -
>
>
>
>
> 2018-06-08 14:26:43 WARN  NativeCodeLoader:62 - Unable to load
> native-hadoop library for your platform... using builtin-java classes where
> applicable
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Running Spark version 2.3.0
> 2018-06-08 14:26:45 INFO  SparkContext:54 - Submitted application:
> EndToEnd_FeatureEngineeringPipeline
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls to:
> bblite
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing view acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - Changing modify acls groups
> to:
> 2018-06-08 14:26:45 INFO  SecurityManager:54 - SecurityManager:
> authentication disabled; ui acls disabled; users  with view permissions:
> Set(bblite); groups with view permissions: Set(); users  with modify
> permissions: Set(bblite); groups with modify permissions: Set()
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'sparkDriver' on port 41957.
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering MapOutputTracker
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering BlockManagerMaster
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 - Using
> org.apache.spark.storage.DefaultTopologyMapper for getting topology
> information
> 2018-06-08 14:26:45 INFO  BlockManagerMasterEndpoint:54 -
> BlockManagerMasterEndpoint up
> 2018-06-08 14:26:45 INFO  DiskBlockManager:54 - Created local directory at
> /appdata/spark/tmp/blockmgr-7b035871-a1f7-47ff-aad8-f7a43367836e
> 2018-06-08 14:26:45 INFO  MemoryStore:54 - MemoryStore started with
> capacity 366.3 MB
> 2018-06-08 14:26:45 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
> 2018-06-08 14:26:45 INFO  log:192 - Logging initialized @3659ms
> 2018-06-08 14:26:45 INFO  Server:346 - jetty-9.3.z-SNAPSHOT
> 2018-06-08 14:26:45 INFO  Server:414 - Started @3733ms
> 2018-06-08 14:26:45 INFO  AbstractConnector:278 - Started
> ServerConnector@3080efb7{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
> 2018-06-08 14:26:45 INFO  Utils:54 - Successfully started service
> 'SparkUI' on port 4040.
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@2c3409b5{/jobs,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7f1ba569{/jobs/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@493631a1{/jobs/job,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@6b12f33c{/jobs/job/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@490023da{/stages,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@31c3a862{/stages/json,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@4da2454f{/stages/stage,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@552f182d{/stages/stage/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@a78a7fa{/stages/pool,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@15142105{/stages/pool/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@7589c977{/storage,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@584a599b{/storage/json,null,
> AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1742621f{/storage/rdd,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@23ea75fb{/storage/rdd/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@1813d280{/environment,null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  ContextHandler:781 - Started
> o.s.j.s.ServletContextHandler@129fc698{/environment/json,
> null,AVAILABLE,@Spark}
> 2018-06-08 14:26:45 INFO  

Re: Spark YARN Error - triggering spark-shell

2018-06-08 Thread Aakash Basu
p/py4j/java_gateway.py",
line 1428, in __call__
  File
"/appdata/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Spark context stopped while waiting for
backend
at
org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:669)
at
org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:177)
at org.apache.spark.SparkContext.(SparkContext.scala:558)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

2018-06-08 14:26:59 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-06-08 14:26:59 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 -
OutputCommitCoordinator stopped!
2018-06-08 14:26:59 INFO  SparkContext:54 - Successfully stopped
SparkContext
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/appdata/spark/tmp/spark-35d9709e-8f20-4b57-82d3 -f3ef0926d3ab
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-1b471b46-0c5a-4f75-94c1-c99d9d674228

Seems the name-node and data-nodes cannot talk to each other correctly,
why, no clue, anyone faced this problem, any help on this?

Thanks,
Aakash.


On Fri, Jun 8, 2018 at 2:31 PM, Sathishkumar Manimoorthy <
mrsathishkuma...@gmail.com> wrote:

> It seems, your spark-on-yarn application is not able to get it's
> application master,
>
> org.apache.spark.SparkException: Yarn application has already ended! It might 
> have been killed or unable to launch application master.
>
>
> Check once on yarn logs
>
> Thanks,
> Sathish-
>
>
> On Fri, Jun 8, 2018 at 2:22 PM, Jeff Zhang  wrote:
>
>>
>> Check the yarn AM log for details.
>>
>>
>>
>> Aakash Basu 于2018年6月8日周五 下午4:36写道:
>>
>>> Hi,
>>>
>>> Getting this error when trying to run Spark Shell using YARN -
>>>
>>> Command: *spark-shell --master yarn --deploy-mode client*
>>>
>>> 2018-06-08 13:39:09 WARN  Client:66 - Neither spark.yarn.jars nor 
>>> spark.yarn.archive is set, falling back to uploading libraries under 
>>> SPARK_HOME.
>>> 2018-06-08 13:39:25 ERROR SparkContext:91 - Error initializing SparkContext.
>>> org.apache.spark.SparkException: Yarn application has already ended! It 
>>> might have been killed or unable to launch application master.
>>>
>>>
>>> The last half of stack-trace -
>>>
>>> 2018-06-08 13:56:11 WARN  YarnSchedulerBackend$YarnSchedulerEndpoint:66 - 
>>> Attempted to request executors before the AM has registered!
>>> 2018-06-08 13:56:11 WARN  MetricsSystem:66 - Stopping a MetricsSystem that 
>>> is not running
>>> org.apache.spark.SparkException: Yarn application has already ended! It 
>>> might have been killed or unable to launch application master.
>>>   at 
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:89)
>>>   at 
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:63)
>>>   at 
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)
>>>   at org.apache.spark.SparkContext.(SparkContext.scala:500)
>>>   at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
>>>   at 
>>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
>>>   at 
>>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
>>>   at scala.Option.getOrElse(Option.scala:121)
>>>   at 
>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
>>>   at org.apache.spark.repl.Main$.createSparkSession(Main.scala:103)
>>>   ... 55 elided
>>> :14: error: not found: value spark
>>>import spark.implicits._
>>>   ^
>>> :14: error: not found: value spark
>>>import spark.sql
>>>
>>>
>>> Tried putting the *spark-yarn_2.11-2.3.0.jar *in Hadoop yarn, still not
>>> working, anything else to do?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>


Re: Spark YARN job submission error (code 13)

2018-06-08 Thread Aakash Basu
p/py4j/java_gateway.py",
line 1428, in __call__
  File
"/appdata/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py",
line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Spark context stopped while waiting for
backend
at
org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:669)
at
org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:177)
at org.apache.spark.SparkContext.(SparkContext.scala:558)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

2018-06-08 14:26:59 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-06-08 14:26:59 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 -
OutputCommitCoordinator stopped!
2018-06-08 14:26:59 INFO  SparkContext:54 - Successfully stopped
SparkContext
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/appdata/spark/tmp/spark-35d9709e-8f20-4b57-82d3 -f3ef0926d3ab
2018-06-08 14:26:59 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-1b471b46-0c5a-4f75-94c1-c99d9d674228

Seems the name-node and data-nodes cannot talk to each other correctly,
why, no clue, anyone faced this problem, any help on this?

Thanks,
Aakash.

On Fri, Jun 8, 2018 at 2:17 PM, Saisai Shao  wrote:

> In Spark on YARN, error code 13 means SparkContext doesn't initialize in
> time. You can check the yarn application log to get more information.
>
> BTW, did you just write a plain python script without creating
> SparkContext/SparkSession?
>
> Aakash Basu  于2018年6月8日周五 下午4:15写道:
>
>> Hi,
>>
>> I'm trying to run a program on a cluster using YARN.
>>
>> YARN is present there along with HADOOP.
>>
>> Problem I'm running into is as below -
>>
>> Container exited with a non-zero exit code 13
>>> Failing this attempt. Failing the application.
>>>  ApplicationMaster host: N/A
>>>  ApplicationMaster RPC port: -1
>>>  queue: default
>>>  start time: 1528297574594
>>>  final status: FAILED
>>>  tracking URL: http://MasterNode:8088/cluster/app/application_
>>> 1528296308262_0004
>>>  user: bblite
>>> Exception in thread "main" org.apache.spark.SparkException: Application
>>> application_1528296308262_0004 finished with failed status
>>>
>>
>> I checked on the net and most of the stackoverflow problems say, that the
>> users have given *.master('local[*]')* in the code while invoking the
>> Spark Session and at the same time, giving *--master yarn* while doing
>> the spark-submit, hence they're getting the error due to conflict.
>>
>> But, in my case, I've not mentioned any master at all at the code. Just
>> trying to run it on yarn by giving *--master yarn* while doing the
>> spark-submit. Below is the code spark invoking -
>>
>> spark = SparkSession\
>> .builder\
>> .appName("Temp_Prog")\
>> .getOrCreate()
>>
>> Below is the spark-submit -
>>
>> *spark-submit --master yarn --deploy-mode cluster --num-executors 3
>> --executor-cores 6 --executor-memory 4G
>> /appdata/codebase/backend/feature_extraction/try_yarn.py*
>>
>> I've tried without --deploy-mode too, still no help.
>>
>> Thanks,
>> Aakash.
>>
>


Spark YARN Error - triggering spark-shell

2018-06-08 Thread Aakash Basu
Hi,

Getting this error when trying to run Spark Shell using YARN -

Command: *spark-shell --master yarn --deploy-mode client*

2018-06-08 13:39:09 WARN  Client:66 - Neither spark.yarn.jars nor
spark.yarn.archive is set, falling back to uploading libraries under
SPARK_HOME.
2018-06-08 13:39:25 ERROR SparkContext:91 - Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended!
It might have been killed or unable to launch application master.


The last half of stack-trace -

2018-06-08 13:56:11 WARN
YarnSchedulerBackend$YarnSchedulerEndpoint:66 - Attempted to request
executors before the AM has registered!
2018-06-08 13:56:11 WARN  MetricsSystem:66 - Stopping a MetricsSystem
that is not running
org.apache.spark.SparkException: Yarn application has already ended!
It might have been killed or unable to launch application master.
  at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:89)
  at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:63)
  at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:164)
  at org.apache.spark.SparkContext.(SparkContext.scala:500)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
  at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
  at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
  at scala.Option.getOrElse(Option.scala:121)
  at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:103)
  ... 55 elided
:14: error: not found: value spark
   import spark.implicits._
  ^
:14: error: not found: value spark
   import spark.sql


Tried putting the *spark-yarn_2.11-2.3.0.jar *in Hadoop yarn, still not
working, anything else to do?

Thanks,
Aakash.


Spark YARN job submission error (code 13)

2018-06-08 Thread Aakash Basu
Hi,

I'm trying to run a program on a cluster using YARN.

YARN is present there along with HADOOP.

Problem I'm running into is as below -

Container exited with a non-zero exit code 13
> Failing this attempt. Failing the application.
>  ApplicationMaster host: N/A
>  ApplicationMaster RPC port: -1
>  queue: default
>  start time: 1528297574594
>  final status: FAILED
>  tracking URL:
> http://MasterNode:8088/cluster/app/application_1528296308262_0004
>  user: bblite
> Exception in thread "main" org.apache.spark.SparkException: Application
> application_1528296308262_0004 finished with failed status
>

I checked on the net and most of the stackoverflow problems say, that the
users have given *.master('local[*]')* in the code while invoking the Spark
Session and at the same time, giving *--master yarn* while doing the
spark-submit, hence they're getting the error due to conflict.

But, in my case, I've not mentioned any master at all at the code. Just
trying to run it on yarn by giving *--master yarn* while doing the
spark-submit. Below is the code spark invoking -

spark = SparkSession\
.builder\
.appName("Temp_Prog")\
.getOrCreate()

Below is the spark-submit -

*spark-submit --master yarn --deploy-mode cluster --num-executors 3
--executor-cores 6 --executor-memory 4G
/appdata/codebase/backend/feature_extraction/try_yarn.py*

I've tried without --deploy-mode too, still no help.

Thanks,
Aakash.


Fundamental Question on Spark's distribution

2018-06-07 Thread Aakash Basu
Hi all,

*Query 1)*

Need a serious help! I'm running feature engineering of different types on
a dataset and trying to benchmark from by tweaking different types of Spark
properties.

I don't know where it is going wrong that a single machine is working
faster than a 3 node cluster, even though, most of the operations from code
are distributed.

The log I collected by running in different ways is -

Remote Spark Benchmarking (4 node cluster, 1 driver, 3 workers) -

Cluster details: 12 GB RAM, 6 cores each.

Medium data -> 1,00,000 sample (0.1 million rows) [Data placed in Local
File System, same path, same data on all worker nodes]
Runs -
1) Time Taken for the Feature Engineering Pipeline to finish:
482.20375990867615 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G
2) Time Taken for the Feature Engineering Pipeline to finish:
467.3759717941284 secs.; --num-executors 10 --executor-cores 6
--executor-memory 11G
3) Time Taken for the Feature Engineering Pipeline to finish:
459.885710477829 secs.; --num-executors 3 --executor-cores 6
--executor-memory 8G
4) Time Taken for the Feature Engineering Pipeline to finish:
476.61902809143066 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G --conf spark.memory.fraction=0.2
5) Time Taken for the Feature Engineering Pipeline to finish:
575.9314386844635 secs.; --num-executors 3 --executor-cores 5
--executor-memory 4G --conf spark.default.parallelism=200

Medium data -> 1,00,000 sample (0.1 million rows) [Data placed in Local
File System]
1) Time Taken for the Feature Engineering Pipeline to finish:
594.1818737983704 secs.
2) Time Taken for the Feature Engineering Pipeline to finish:
528.6015181541443 secs. (on single driver node [local])
3) Time Taken for the Feature Engineering Pipeline to finish:
323.6546362755467 secs. (on my laptop - 16GB RAM and 8 Cores).

*Query 2)*

The below is the event timeline of the same code taken from the Spark UI,
can you provide some insight on why there are two big gaps between the
parallel tasks? Does it mean, that time, there's no operation happening? I
am kind of new to Spark UI monitoring, can anyone suggest me other aspects
which needs to be monitored to optimize further?





Thanks,
Aakash.


[Spark Streaming] Distinct Count on unrelated columns

2018-06-06 Thread Aakash Basu
Hi guys,

Posted a question (link)

on StackOverflow, any help?


Thanks,
Aakash.


Re: Append In-Place to S3

2018-06-02 Thread Aakash Basu
As Jay suggested correctly, if you're joining then overwrite otherwise only
append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite')
because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:

> Hi Jay,
>
> Thanks for your response. Are you saying to append the new data and then
> remove the duplicates to the whole data set afterwards overwriting the
> existing data set with new data set with appended values? I will give that
> a try.
>
> Cheers,
> Ben
>
> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>
>> Benjamin,
>>
>> The append will append the "new" data to the existing data with removing
>> the duplicates. You would need to overwrite the file everytime if you need
>> unique values.
>>
>> Thanks,
>> Jayadeep
>>
>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>
>>> I have a situation where I trying to add only new rows to an existing
>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>> for each hour of the day. First, I create a DF from the existing data, then
>>> I use a query to create another DF with the data that is new. Here is the
>>> code snippet.
>>>
>>> df = spark.read.parquet(existing_data_path)
>>> df.createOrReplaceTempView(‘existing_data’)
>>> new_df = spark.read.parquet(new_data_path)
>>> new_df.createOrReplaceTempView(’new_data’)
>>> append_df = spark.sql(
>>> """
>>> WITH ids AS (
>>> SELECT DISTINCT
>>> source,
>>> source_id,
>>> target,
>>> target_id
>>> FROM new_data i
>>> LEFT ANTI JOIN existing_data im
>>> ON i.source = im.source
>>> AND i.source_id = im.source_id
>>> AND i.target = im.target
>>> AND i.target = im.target_id
>>> """
>>> )
>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>>> compression='gzip’)
>>>
>>>
>>> I thought this would append new rows and keep the data unique, but I am
>>> see many duplicates. Can someone help me with this and tell me what I am
>>> doing wrong?
>>>
>>> Thanks,
>>> Ben
>>>
>>


[Spark SQL] Efficiently calculating Weight of Evidence in PySpark

2018-06-01 Thread Aakash Basu
Hi guys,

Can anyone please let me know if you've any clue on this problem I posted
in StackOverflow -

https://stackoverflow.com/questions/50638911/how-to-efficiently-calculate-woe-in-pyspark

Thanks,
Aakash.


Fwd: [Help] PySpark Dynamic mean calculation

2018-05-31 Thread Aakash Basu
Solved it myself.

In-case anyone needs to reuse the code. Can be re-used.

orig_list = ['Married-spouse-absent', 'Married-AF-spouse',
'Separated', 'Married-civ-spouse', 'Widowed', 'Divorced',
'Never-married']
k_folds = 3

cols = df.columns  # ['fnlwgt_bucketed',
'Married-spouse-absent_fold_0', 'Married-AF-spouse_fold_0',
'Separated_fold_0', 'Married-civ-spouse_fold_0', 'Widowed_fold_0',
'Divorced_fold_0', 'Never-married_fold_0',
'Married-spouse-absent_fold_1', 'Married-AF-spouse_fold_1',
'Separated_fold_1', 'Married-civ-spouse_fold_1', 'Widowed_fold_1',
'Divorced_fold_1', 'Never-married_fold_1',
'Married-spouse-absent_fold_2', 'Married-AF-spouse_fold_2',
'Separated_fold_2', 'Married-civ-spouse_fold_2', 'Widowed_fold_2',
'Divorced_fold_2', 'Never-married_fold_2']

for folds in range(k_folds):
for column in orig_list:
col_namer = []
for fold in range(k_folds):
if fold != folds:
col_namer.append(column+'_fold_'+str(fold))
df = df.withColumn(column+'_fold_'+str(folds)+'_mean', (sum(df[col] for col in
col_namer)/(k_folds-1)))
print(col_namer)
df.show(1)



-- Forwarded message --
From: Aakash Basu 
Date: Thu, May 31, 2018 at 3:40 PM
Subject: [Help] PySpark Dynamic mean calculation
To: user 


Hi,

Using -
Python 3.6
Spark 2.3

Original DF -
key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2
1 1 2 3 4 5 6
2 7 5 3 5 2 1


I want to calculate means from the below  dataframe as follows (like this
for all columns and all folds) -

key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2 a_fold_0_mean
b_fold_0_mean a_fold_1_mean
1 1 2 3 4 5 6 3 + 5 / 2 4 + 6 / 2 1 + 5 / 2
2 7 5 3 5 2 1 3 + 2 / 2 5 + 1 / 2 7 + 2 / 2

Process -

For fold_0 my mean should be fold_1 + fold_2 / 2
For fold_1 my mean should be fold_0 + fold_2 / 2
For fold_2 my mean should be fold_0 + fold_1 / 2

For each column.

And my number of columns, no. of folds, everything would be dynamic.

How to go about this problem on a pyspark dataframe?

Thanks,
Aakash.


[Help] PySpark Dynamic mean calculation

2018-05-31 Thread Aakash Basu
Hi,

Using -
Python 3.6
Spark 2.3

Original DF -
key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2
1 1 2 3 4 5 6
2 7 5 3 5 2 1


I want to calculate means from the below  dataframe as follows (like this
for all columns and all folds) -

key a_fold_0 b_fold_0 a_fold_1 b_fold_1 a_fold_2 b_fold_2 a_fold_0_mean
b_fold_0_mean a_fold_1_mean
1 1 2 3 4 5 6 3 + 5 / 2 4 + 6 / 2 1 + 5 / 2
2 7 5 3 5 2 1 3 + 2 / 2 5 + 1 / 2 7 + 2 / 2

Process -

For fold_0 my mean should be fold_1 + fold_2 / 2
For fold_1 my mean should be fold_0 + fold_2 / 2
For fold_2 my mean should be fold_0 + fold_1 / 2

For each column.

And my number of columns, no. of folds, everything would be dynamic.

How to go about this problem on a pyspark dataframe?

Thanks,
Aakash.


[Suggestions needed] Weight of Evidence PySpark

2018-05-31 Thread Aakash Basu
Hi guys,

I'm trying to calculate WoE on a particular categorical column depending on
the target column. But the code is taking a lot of time on very few
datapoints (rows).

How can I optimize it to make it performant enough?

Here's the code (here categorical_col is a python list of columns) -

for item in categorical_col:
new_df = spark.sql('Select `' + item + '`, `' + target_col + '`,
count(*) as Counts from a group by `'
   + item + '`, `' + target_col + '` order by `' +
item + '`')
# new_df.show()
new_df.registerTempTable('b')
# exit(0)
new_df2 = spark.sql('Select `' + item + '`, ' +
'case when `' + target_col + '` == 0 then
Counts else 0 end as Count_0, ' +
'case when `' + target_col + '` == 1 then
Counts else 0 end as Count_1 ' +
'from b')

spark.catalog.dropTempView('b')
# new_df2.show()
new_df2.registerTempTable('c')
# exit(0)

new_df3 = spark.sql('SELECT `' + item + '`, SUM(Count_0) AS Count_0, ' +
'SUM(Count_1) AS Count_1 FROM c GROUP BY `' +
item + '`')

spark.catalog.dropTempView('c')
# new_df3.show()
# exit(0)

new_df3.registerTempTable('d')

# SQL DF Experiment
new_df4 = spark.sql('Select `' + item + '` as
bucketed_col_of_source, Count_0/(select sum(d.Count_0) as sum from d)
as Prop_0, ' +
'Count_1/(select sum(d.Count_1) as sum from d)
as Prop_1 from d')

spark.catalog.dropTempView('d')
# new_df4.show()
# exit(0)
new_df4.registerTempTable('e')

new_df5 = spark.sql('Select *, case when log(e.Prop_0/e.Prop_1) IS
NULL then 0 else log(e.Prop_0/e.Prop_1) end as WoE from e')

spark.catalog.dropTempView('e')

# print('Problem starts here: ')
# new_df5.show()

new_df5.registerTempTable('WoE_table')

joined_Train_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item +
  '_WoE` from a bucketed inner join WoE_table
on bucketed.`' + item +
  '` = WoE_table.bucketed_col_of_source')

# joined_Train_DF.show()
joined_Test_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item +
  '_WoE` from test_data bucketed inner join
WoE_table on bucketed.`' + item +
  '` = WoE_table.bucketed_col_of_source')

if validation:
joined_Validation_DF = spark.sql('Select bucketed.*,
WoE_table.WoE as `' + item +
   '_WoE` from validation_data
bucketed inner join WoE_table on bucketed.`' + item +
   '` = WoE_table.bucketed_col_of_source')
WoE_Validation_DF = joined_Validation_DF

spark.catalog.dropTempView('WoE_table')

WoE_Train_DF = joined_Train_DF
WoE_Test_DF = joined_Test_DF
col_len = len(categorical_col)
if col_len > 1:
WoE_Train_DF.registerTempTable('a')
WoE_Test_DF.registerTempTable('test_data')
if validation:
# print('got inside...')
WoE_Validation_DF.registerTempTable('validation_data')

Any help?

Thanks,
Aakash.


Spark AsyncEventQueue doubt

2018-05-27 Thread Aakash Basu
Hi,

I'm getting the below ERROR and WARN when running a little heavy
calculation on a dataset -

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> 2018-05-27 12:51:11 ERROR AsyncEventQueue:70 - Dropping event from queue
> appStatus. This likely means one of the listeners is too slow and cannot
> keep up with the rate at which tasks are being started by the scheduler.
> 2018-05-27 12:51:11 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Thu Jan
> 01 05:30:00 IST 1970.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:52:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:51:11 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:53:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:52:14 IST 2018.
> 2018-05-27 12:54:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:53:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:55:14 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:54:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:56:15 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:55:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:57:32 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:56:15 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:58:32 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:57:32 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 12:59:33 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:58:32 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 13:00:34 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 12:59:33 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 13:01:35 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 13:00:34 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 +
> 53) / 200]2018-05-27 13:02:36 WARN  AsyncEventQueue:66 - Dropped
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May
> 27 13:01:35 IST 2018.
>

Even though my job is not failing but why am I getting these?

Thanks,
Aakash.


Re: Spark 2.3 Tree Error

2018-05-26 Thread Aakash Basu
I think I found the solution.

The last comment from this link -
https://issues.apache.org/jira/browse/SPARK-14948

But, my question is even after using table.column, why does Spark find the
same column name from two different tables ambiguous?

I mean table1.column = table2.column, Spark should comprehend that even
though the name of column is same but they come from two different tables,
isn't?

Well, I'll try out the solution provided above, and see if it works for me.

Thanks!

On Sat, May 26, 2018 at 9:45 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> You're right.
>
> The same set of queries are working for max 2 columns in loop.
>
> If I give more than 2 column, the 2nd column is failing with this error -
>
> *attribute(s) with the same name appear in the operation:
> marginal_adhesion_bucketed. Please check if the right attribute(s) are
> used.*
>
> Any idea on what maybe the reason?
>
> I rechecked the query, its has correct logic.
>
> On Sat, May 26, 2018 at 9:35 PM, hemant singh <hemant2...@gmail.com>
> wrote:
>
>> Per the sql plan this is where it is failing -
>>
>> Attribute(s) with the same name appear in the operation: fnlwgt_bucketed. 
>> Please check if the right attribute(s) are used.;
>>
>>
>>
>> On Sat, May 26, 2018 at 6:16 PM, Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> This query is based on one step further from the query in this link
>>> <https://stackoverflow.com/questions/50530679/spark-2-3-asynceventqueue-error-and-warning>.
>>> In this scenario, I add 1 or 2 more columns to be processed, Spark throws
>>> an ERROR by printing the physical plan of queries.
>>>
>>> It says, *Resolved attribute(s) fnlwgt_bucketed#152530 missing* which
>>> is untrue, as if I run the same code on less than 3 columns where this is
>>> one column, it works like a charm, so I can clearly assume it is not a bug
>>> in my query or code.
>>>
>>> Is it then a out of memory error? As I think, internally, since there
>>> are many registered tables on memory, they're getting deleted due to
>>> overflow of data and getting deleted, this is totally my assumption. Any
>>> insight on this? Did anyone of you face any issue like this?
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.: 
>>> org.apache.spark.sql.AnalysisException: Resolved attribute(s) 
>>> fnlwgt_bucketed#152530 missing from 
>>> occupation#17,high_income#25,fnlwgt#13,education#14,marital-status#16,relationship#18,workclass#12,sex#20,id_num#10,native_country#24,race#19,education-num#15,hours-per-week#23,age_bucketed#152432,capital-loss#22,age#11,capital-gain#21,fnlwgt_bucketed#99009
>>>  in operator !Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>>> education#14, education-num#15, marital-status#16, occupation#17, 
>>> relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22, 
>>> hours-per-week#23, native_country#24, high_income#25, age_bucketed#152432, 
>>> fnlwgt_bucketed#152530, if (isnull(cast(hours-per-week#23 as double))) null 
>>> else if (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else if 
>>> (isnull(cast(hours-per-week#23 as double))) null else 
>>> UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS 
>>> hours-per-week_bucketed#152299]. Attribute(s) with the same name appear in 
>>> the operation: fnlwgt_bucketed. Please check if the right attribute(s) are 
>>> used.;;Project [id_num#10, age#11, workclass#12, fnlwgt#13, education#14, 
>>> education-num#15, marital-status#16, occupation#17, relationship#18, 
>>> race#19, sex#20, capital-gain#21, capital-loss#22, hours-per-week#23, 
>>> native_country#24, high_income#25, age_bucketed#48257, 
>>> fnlwgt_bucketed#99009, hours-per-week_bucketed#152299, 
>>> age_bucketed_WoE#152431, WoE#152524 AS fnlwgt_bucketed_WoE#152529]+- Join 
>>&g

Re: Spark 2.3 Tree Error

2018-05-26 Thread Aakash Basu
You're right.

The same set of queries are working for max 2 columns in loop.

If I give more than 2 column, the 2nd column is failing with this error -

*attribute(s) with the same name appear in the operation:
marginal_adhesion_bucketed. Please check if the right attribute(s) are
used.*

Any idea on what maybe the reason?

I rechecked the query, its has correct logic.

On Sat, May 26, 2018 at 9:35 PM, hemant singh <hemant2...@gmail.com> wrote:

> Per the sql plan this is where it is failing -
>
> Attribute(s) with the same name appear in the operation: fnlwgt_bucketed. 
> Please check if the right attribute(s) are used.;
>
>
>
> On Sat, May 26, 2018 at 6:16 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi,
>>
>> This query is based on one step further from the query in this link
>> <https://stackoverflow.com/questions/50530679/spark-2-3-asynceventqueue-error-and-warning>.
>> In this scenario, I add 1 or 2 more columns to be processed, Spark throws
>> an ERROR by printing the physical plan of queries.
>>
>> It says, *Resolved attribute(s) fnlwgt_bucketed#152530 missing* which is
>> untrue, as if I run the same code on less than 3 columns where this is one
>> column, it works like a charm, so I can clearly assume it is not a bug in
>> my query or code.
>>
>> Is it then a out of memory error? As I think, internally, since there are
>> many registered tables on memory, they're getting deleted due to overflow
>> of data and getting deleted, this is totally my assumption. Any insight on
>> this? Did anyone of you face any issue like this?
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.: 
>> org.apache.spark.sql.AnalysisException: Resolved attribute(s) 
>> fnlwgt_bucketed#152530 missing from 
>> occupation#17,high_income#25,fnlwgt#13,education#14,marital-status#16,relationship#18,workclass#12,sex#20,id_num#10,native_country#24,race#19,education-num#15,hours-per-week#23,age_bucketed#152432,capital-loss#22,age#11,capital-gain#21,fnlwgt_bucketed#99009
>>  in operator !Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>> education#14, education-num#15, marital-status#16, occupation#17, 
>> relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22, 
>> hours-per-week#23, native_country#24, high_income#25, age_bucketed#152432, 
>> fnlwgt_bucketed#152530, if (isnull(cast(hours-per-week#23 as double))) null 
>> else if (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else if 
>> (isnull(cast(hours-per-week#23 as double))) null else 
>> UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS 
>> hours-per-week_bucketed#152299]. Attribute(s) with the same name appear in 
>> the operation: fnlwgt_bucketed. Please check if the right attribute(s) are 
>> used.;;Project [id_num#10, age#11, workclass#12, fnlwgt#13, education#14, 
>> education-num#15, marital-status#16, occupation#17, relationship#18, 
>> race#19, sex#20, capital-gain#21, capital-loss#22, hours-per-week#23, 
>> native_country#24, high_income#25, age_bucketed#48257, 
>> fnlwgt_bucketed#99009, hours-per-week_bucketed#152299, 
>> age_bucketed_WoE#152431, WoE#152524 AS fnlwgt_bucketed_WoE#152529]+- Join 
>> Inner, (fnlwgt_bucketed#99009 = fnlwgt_bucketed#152530)
>>:- SubqueryAlias bucketed
>>:  +- SubqueryAlias a
>>: +- Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>> education#14, education-num#15, marital-status#16, occupation#17, 
>> relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22, 
>> hours-per-week#23, native_country#24, high_income#25, age_bucketed#48257, 
>> fnlwgt_bucketed#99009, hours-per-week_bucketed#152299, WoE#152426 AS 
>> age_bucketed_WoE#152431]
>>:+- Join Inner, (age_bucketed#48257 = age_bucketed#152432)
>>:   :- SubqueryAlias bucketed
>>:   :  +- SubqueryAlias a
>>:   : +- Project [id_num#10, age#11, workclass#12, fnlwgt#13, 
>> education#14, education-num#15, marital-status#16, occupation

Re: Silly question on Dropping Temp Table

2018-05-26 Thread Aakash Basu
Well, it did, meaning, internally a TempTable and a TempView are the same.

Thanks buddy!

On Sat, May 26, 2018 at 9:23 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Question is, while registering, using registerTempTable() and while
> dropping, using a dropTempView(), would it go and hit the same TempTable
> internally or would search for a registered view? Not sure. Any idea?
>
> On Sat, May 26, 2018 at 9:04 PM, SNEHASISH DUTTA <info.snehas...@gmail.com
> > wrote:
>
>> I think it's dropTempView
>>
>> On Sat, May 26, 2018, 8:56 PM Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to use dropTempTable() after the respective Temporary Table's
>>> use is over (to free up the memory for next calculations).
>>>
>>> Newer Spark Session doesn't need sqlContext, so, it is confusing me on
>>> how to use the function.
>>>
>>> 1) Tried, same DF which I used to register a temp table to do -
>>>
>>> DF.dropTempTable('xyz')
>>>
>>> Didn't work.
>>>
>>> 2) Tried following way too, as spark internally invokes sqlContext too
>>> along with sparkContext, but didn't work -
>>>
>>> spark.dropTempTable('xyz')
>>>
>>> 3) Tried spark.catalog to drop, this failed too -
>>>
>>> spark.catalog.dropTempTable('xyz')
>>>
>>>
>>> What to do? 1.6 examples on internet are not working in the 2.3 version
>>> for dropTempTable().
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>


Re: Silly question on Dropping Temp Table

2018-05-26 Thread Aakash Basu
Question is, while registering, using registerTempTable() and while
dropping, using a dropTempView(), would it go and hit the same TempTable
internally or would search for a registered view? Not sure. Any idea?

On Sat, May 26, 2018 at 9:04 PM, SNEHASISH DUTTA <info.snehas...@gmail.com>
wrote:

> I think it's dropTempView
>
> On Sat, May 26, 2018, 8:56 PM Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I'm trying to use dropTempTable() after the respective Temporary Table's
>> use is over (to free up the memory for next calculations).
>>
>> Newer Spark Session doesn't need sqlContext, so, it is confusing me on
>> how to use the function.
>>
>> 1) Tried, same DF which I used to register a temp table to do -
>>
>> DF.dropTempTable('xyz')
>>
>> Didn't work.
>>
>> 2) Tried following way too, as spark internally invokes sqlContext too
>> along with sparkContext, but didn't work -
>>
>> spark.dropTempTable('xyz')
>>
>> 3) Tried spark.catalog to drop, this failed too -
>>
>> spark.catalog.dropTempTable('xyz')
>>
>>
>> What to do? 1.6 examples on internet are not working in the 2.3 version
>> for dropTempTable().
>>
>> Thanks,
>> Aakash.
>>
>


Silly question on Dropping Temp Table

2018-05-26 Thread Aakash Basu
Hi all,

I'm trying to use dropTempTable() after the respective Temporary Table's
use is over (to free up the memory for next calculations).

Newer Spark Session doesn't need sqlContext, so, it is confusing me on how
to use the function.

1) Tried, same DF which I used to register a temp table to do -

DF.dropTempTable('xyz')

Didn't work.

2) Tried following way too, as spark internally invokes sqlContext too
along with sparkContext, but didn't work -

spark.dropTempTable('xyz')

3) Tried spark.catalog to drop, this failed too -

spark.catalog.dropTempTable('xyz')


What to do? 1.6 examples on internet are not working in the 2.3 version for
dropTempTable().

Thanks,
Aakash.


Spark 2.3 Tree Error

2018-05-26 Thread Aakash Basu
Hi,

This query is based on one step further from the query in this link
.
In this scenario, I add 1 or 2 more columns to be processed, Spark throws
an ERROR by printing the physical plan of queries.

It says, *Resolved attribute(s) fnlwgt_bucketed#152530 missing* which is
untrue, as if I run the same code on less than 3 columns where this is one
column, it works like a charm, so I can clearly assume it is not a bug in
my query or code.

Is it then a out of memory error? As I think, internally, since there are
many registered tables on memory, they're getting deleted due to overflow
of data and getting deleted, this is totally my assumption. Any insight on
this? Did anyone of you face any issue like this?

py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.:
org.apache.spark.sql.AnalysisException: Resolved attribute(s)
fnlwgt_bucketed#152530 missing from
occupation#17,high_income#25,fnlwgt#13,education#14,marital-status#16,relationship#18,workclass#12,sex#20,id_num#10,native_country#24,race#19,education-num#15,hours-per-week#23,age_bucketed#152432,capital-loss#22,age#11,capital-gain#21,fnlwgt_bucketed#99009
in operator !Project [id_num#10, age#11, workclass#12, fnlwgt#13,
education#14, education-num#15, marital-status#16, occupation#17,
relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22,
hours-per-week#23, native_country#24, high_income#25,
age_bucketed#152432, fnlwgt_bucketed#152530, if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else
UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS
hours-per-week_bucketed#152299]. Attribute(s) with the same name
appear in the operation: fnlwgt_bucketed. Please check if the right
attribute(s) are used.;;Project [id_num#10, age#11, workclass#12,
fnlwgt#13, education#14, education-num#15, marital-status#16,
occupation#17, relationship#18, race#19, sex#20, capital-gain#21,
capital-loss#22, hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, fnlwgt_bucketed#99009,
hours-per-week_bucketed#152299, age_bucketed_WoE#152431, WoE#152524 AS
fnlwgt_bucketed_WoE#152529]+- Join Inner, (fnlwgt_bucketed#99009 =
fnlwgt_bucketed#152530)
   :- SubqueryAlias bucketed
   :  +- SubqueryAlias a
   : +- Project [id_num#10, age#11, workclass#12, fnlwgt#13,
education#14, education-num#15, marital-status#16, occupation#17,
relationship#18, race#19, sex#20, capital-gain#21, capital-loss#22,
hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, fnlwgt_bucketed#99009,
hours-per-week_bucketed#152299, WoE#152426 AS age_bucketed_WoE#152431]
   :+- Join Inner, (age_bucketed#48257 = age_bucketed#152432)
   :   :- SubqueryAlias bucketed
   :   :  +- SubqueryAlias a
   :   : +- Project [id_num#10, age#11, workclass#12,
fnlwgt#13, education#14, education-num#15, marital-status#16,
occupation#17, relationship#18, race#19, sex#20, capital-gain#21,
capital-loss#22, hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, fnlwgt_bucketed#99009, if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else if
(isnull(cast(hours-per-week#23 as double))) null else
UDF:bucketizer_0(cast(hours-per-week#23 as double)) AS
hours-per-week_bucketed#152299]
   :   :+- Project [id_num#10, age#11, workclass#12,
fnlwgt#13, education#14, education-num#15, marital-status#16,
occupation#17, relationship#18, race#19, sex#20, capital-gain#21,
capital-loss#22, hours-per-week#23, native_country#24, high_income#25,
age_bucketed#48257, if (isnull(cast(fnlwgt#13 as double))) null else
if (isnull(cast(fnlwgt#13 as double))) null else if
(isnull(cast(fnlwgt#13 as double))) null else
UDF:bucketizer_0(cast(fnlwgt#13 as double)) AS fnlwgt_bucketed#99009]
   :   :   +- Project [id_num#10, age#11,
workclass#12, fnlwgt#13, education#14, education-num#15,
marital-status#16, occupation#17, relationship#18, race#19, sex#20,
capital-gain#21, capital-loss#22, hours-per-week#23,
native_country#24, high_income#25, if (isnull(cast(age#11 as double)))
null else if (isnull(cast(age#11 as double))) null else if
(isnull(cast(age#11 as double))) null else

Spark 2.3 Memory Leak on Executor

2018-05-26 Thread Aakash Basu
Hi,

I am getting memory leak warning which ideally was a Spark bug back till
1.6 version and was resolved.

Mode: Standalone IDE: PyCharm Spark version: 2.3 Python version: 3.6

Below is the stack trace -

2018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak detected;
size = 262144 bytes, TID = 31482018-05-25 15:00:05 WARN  Executor:66 -
Managed memory leak detected; size = 262144 bytes, TID =
31522018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31512018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31502018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31492018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31532018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31542018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31582018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31552018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31572018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31602018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31612018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31562018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31592018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31652018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
31632018-05-25 15:00:05 WARN  Executor:66 - Managed memory leak
detected; size = 262144 bytes, TID = 31622018-05-25 15:00:05 WARN
Executor:66 - Managed memory leak detected; size = 262144 bytes, TID =
3166

Any insight on why it may happen? Though my job is successfully getting
accomplished.

I've posted the query on StackOverflow

too.

P.S - No connection to database is kept open (as per a comment there).

Thanks,
Aakash.


[Query] Weight of evidence on Spark

2018-05-25 Thread Aakash Basu
Hi guys,

What's the best way to create feature column with Weight of Evidence
calculated for categorical columns on target column (both Binary and
Multi-Class)?

Any insight?

Thanks,
Aakash.


Fwd: XGBoost on PySpark

2018-05-23 Thread Aakash Basu
Guys any insight on the below?

-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Sat, May 19, 2018 at 12:21 PM
Subject: XGBoost on PySpark
To: user <user@spark.apache.org>


Hi guys,

I need help in implementing XG-Boost in PySpark.

As per the conversation in a popular thread regarding XGB goes, it is
available in Scala and Java versions but not Python. But, we've to
implement a pythonic distributed solution (on Spark) maybe using DMLC or
similar, to go ahead with XGB solutioning.

Anybody implemented the same? If yes, please share some insight on how to
go about it.

Thanks,
Aakash.


XGBoost on PySpark

2018-05-19 Thread Aakash Basu
Hi guys,

I need help in implementing XG-Boost in PySpark.

As per the conversation in a popular thread regarding XGB goes, it is
available in Scala and Java versions but not Python. But, we've to
implement a pythonic distributed solution (on Spark) maybe using DMLC or
similar, to go ahead with XGB solutioning.

Anybody implemented the same? If yes, please share some insight on how to
go about it.

Thanks,
Aakash.


[How To] Using Spark Session in internal called classes

2018-04-23 Thread Aakash Basu
Hi,

I have created my own Model Tuner class which I want to use to tune models
and return a Model object if the user expects. This Model Tuner is in a
file which I would ideally import into another file and call the class and
use it.

Outer file {from where I'd be calling the Model Tuner): I am using
SparkSession to start my Spark job and then read a file, do some basic
operations to prepare it as a Dense Vector and then send it into the Model
to tune.

While I send it into the model, I get this *error*:

*AttributeError: Cannot load _jvm from SparkContext. Is SparkContext
initialized?*


The *full traceback*:

Traceback (most recent call last):
  File "/home/aakashbasu/PycharmProjects/bb_lite/2.0
backend/model_tuning/aakash_test.py", line 9, in 
from model_tuning.model_tuning_newer import ModelTuner
  File "/home/aakashbasu/PycharmProjects/bb_lite/2.0
backend/model_tuning/model_tuning.py", line 13, in 
labelCol="label", predictionCol="prediction", metricName="accuracy")
  File "/usr/local/lib/python3.5/dist-packages/pyspark/__init__.py", line
105, in wrapper
return func(self, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/ml/evaluation.py",
line 298, in __init__
"org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator",
self.uid)
  File "/usr/local/lib/python3.5/dist-packages/pyspark/ml/wrapper.py", line
59, in _new_java_obj
java_obj = _jvm()
  File "/usr/local/lib/python3.5/dist-packages/pyspark/ml/util.py", line
44, in _jvm
raise AttributeError("Cannot load _jvm from SparkContext. Is
SparkContext initialized?")
AttributeError: Cannot load _jvm from SparkContext. Is SparkContext
initialized?



How to ensure that the inner file also uses the same Spark Context which is
initiated by the outer file using SparkSession builder?

Any help?

Thanks,
Aakash.


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
Hi Gerard,

"If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem."

I tried literally everything, nothing worked out.

1) Tried NC from two different ports for two diff streams, still nothing
worked.

2) Tried same using Kafka with awaitAnyTermination, still no use, the first
stream write kept on blocking the second... (And inner queries with
aggregation doesn't work in Spark Streaming it seems, as it expects a
separate writeStream.start()).

Any insight (or direct update to the code would be helpful).

Thanks,
Aakash.

On Mon 16 Apr, 2018, 9:05 PM Gerard Maas, <gerard.m...@gmail.com> wrote:

> Aakash,
>
> There are two issues here.
> The issue with the code on the first question is that the first query
> blocks and the code for the second does not get executed. Panagiotis
> pointed this out correctly.
> In the updated code, the issue is related to netcat (nc) and the way
> structured streaming works. As far as I remember, netcat only delivers data
> to the first network connection.
> On the structured streaming side, each query will issue its own
> connection. This results in only the first query getting the data.
> If you would talk to a TPC server supporting multiple connected clients,
> you would see data in both queries.
>
> If your actual source is Kafka, the original solution of using
> `spark.streams.awaitAnyTermination`  should solve the problem.
>
> -kr, Gerard.
>
>
>
> On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hey Jayesh and Others,
>>
>> Is there then, any other way to come to a solution for this use-case?
>>
>> Thanks,
>> Aakash.
>>
>> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
>> jayesh.lalw...@capitalone.com> wrote:
>>
>>> Note that what you are trying to do here is join a streaming data frame
>>> with an aggregated streaming data frame. As per the documentation, joining
>>> an aggregated streaming data frame with another streaming data frame is not
>>> supported
>>>
>>>
>>>
>>>
>>>
>>> *From: *spark receiver <spark.recei...@gmail.com>
>>> *Date: *Friday, April 13, 2018 at 11:49 PM
>>> *To: *Aakash Basu <aakash.spark@gmail.com>
>>> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
>>> user@spark.apache.org>
>>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>>
>>>
>>>
>>> Hi Panagiotis ,
>>>
>>>
>>>
>>> Wondering you solved the problem or not? Coz I met the same issue today.
>>> I’d appreciate  so much if you could paste the code snippet  if it’s
>>> working .
>>>
>>>
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>>>
>>>
>>>
>>> Hi Panagiotis,
>>>
>>> I did that, but it still prints the result of the first query and awaits
>>> for new data, doesn't even goes to the next one.
>>>
>>> *Data -*
>>>
>>> $ nc -lk 9998
>>>
>>> 1,2
>>> 3,4
>>> 5,6
>>> 7,8
>>>
>>> *Result -*
>>>
>>> ---
>>> Batch: 0
>>> ---
>>> ++
>>> |aver|
>>> ++
>>> | 3.0|
>>> ++
>>>
>>> ---
>>> Batch: 1
>>> ---
>>> ++
>>> |aver|
>>> ++
>>> | 4.0|
>>> ++
>>>
>>>
>>>
>>> *Updated Code -*
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import split
>>>
>>> spark = SparkSession \
>>> .builder \
>>> .appName("StructuredNetworkWordCount") \
>>> .getOrCreate()
>>>
>>> data = spark \
>>> .readStream \
>>> .format("socket") \
>>> .option("header","true") \
>>> .option("host", "localhost") \
>>> .option("port", 9998) \
>>> .load("csv")
>>>
>>>
>>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>>> split(data.value, ","

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
If I use timestamp based windowing, then my average will not be global
average but grouped by timestamp, which is not my requirement. I want to
recalculate the avg of entire column, every time a new row(s) comes in and
divide the other column with the updated avg.

Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> You could do it if you had a timestamp in your data.  You can use windowed
> operations to divide a value by it’s own average over a window. However, in
> structured streaming, you can only window by timestamp columns. You cannot
> do windows aggregations on integers.
>
>
>
> *From: *Aakash Basu <aakash.spark@gmail.com>
> *Date: *Monday, April 16, 2018 at 4:52 AM
> *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Cc: *spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis <
> panga...@gmail.com>, user <user@spark.apache.org>
>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver <spark.recei...@gmail.com>
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu <aakash.spark@gmail.com>
> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
>
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
>
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
>
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTerminat

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
Hey Jayesh and Others,

Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver <spark.recei...@gmail.com>
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu <aakash.spark@gmail.com>
> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
>
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
>
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
>
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
>
>
> I hope this helps.
>
>
>
> Cheers,
>
> Panagiotis
>
>
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
>
> -- Forwarded message --
> From: *Aakash Basu* <aakash.spark@gmail.com>
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user <user@spark.apache.org>
>
> Hi,
>
> If I have more than one writeStream in a code, which operates on the same
> readStream data, why does it produce only the first writeStream? I want the
> second one to be also printed on the console.
>
> How to do that?
>
>
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> 

Is DLib available for Spark?

2018-04-10 Thread Aakash Basu
Hi team,

Is DLib package available for use through Spark?

Thanks,
Aakash.


Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-09 Thread Aakash Basu
Hey Felix,

I've already tried with

.format("memory")
   .queryName("tableName")

but, still, it doesn't work for the second query. It just stalls the
program expecting new data for the first query.

Here's my code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")
query2 = df \
.writeStream \
.format("memory") \
.queryName("ABCD") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
ABCD) col3 from ds")

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()


Here's my data -

1,2
3,4
5,6
7,8
9,10
11,12
13,14


What do you thing the problem maybe?


Thanks in adv,
Aakash.

On Fri, Apr 6, 2018 at 9:55 PM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> Instead of write to console you need to write to memory for it to be
> queryable
>
>
>  .format("memory")
>.queryName("tableName")
> https://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#output-sinks
>
> --
> *From:* Aakash Basu <aakash.spark@gmail.com>
> *Sent:* Friday, April 6, 2018 3:22:07 AM
> *To:* user
> *Subject:* Fwd: [Structured Streaming Query] Calculate Running Avg from
> Kafka feed using SQL query
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
> -- Forwarded message --
> From: Aakash Basu <aakash.spark@gmail.com>
> Date: Mon, Apr 2, 2018 at 1:01 PM
> Subject: [Structured Streaming Query] Calculate Running Avg from Kafka
> feed using SQL query
> To: user <user@spark.apache.org>, "Bowden, Chris" <
> chris.bow...@microfocus.com>
>
>
> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming. *
>
> *Avg(Col1) = Running Avg *
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-06 Thread Aakash Basu
Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits
for new data, doesn't even goes to the next one.

*Data -*

$ nc -lk 9998

1,2
3,4
5,6
7,8

*Result -*

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


*Updated Code -*

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()



Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
wrote:

> Hello Aakash,
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
> I hope this helps.
>
> Cheers,
> Panagiotis
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>> -- Forwarded message --
>> From: Aakash Basu <aakash.spark@gmail.com>
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user <user@spark.apache.org>
>>
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split, col
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("Stream_Col_Oper_Spark") \
>> .getOrCreate()
>>
>> data = spark.readStream.format("kafka") \
>> .option("startingOffsets", "latest") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("subscribe", "test1") \
>> .load()
>>
>> ID = data.select('value') \
>> .withColumn('value', data.value.cast("string")) \
>> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>> .drop('value')
>>
>> ID.createOrReplaceTempView("transformed_Stream_DF")
>>
>> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
>> col3 from transformed_Stream_DF")
>>
>>
>> # ---#
>>
>> query1 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='3 seconds') \
>> .start()
>>
>> query1.awaitTermination()
>> # ---#
>>
>> query2 = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='3 seconds') \
>> .start()
>>
>> query2.awaitTermination()
>>
>> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
>> --packages 
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>>  
>> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>>
>>
>>
>>
>> Thanks,
>> Aakash.
>>
>>
>


Fwd: [Structured Streaming] More than 1 streaming in a code

2018-04-06 Thread Aakash Basu
Any help?

Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <user@spark.apache.org>


Hi,

If I have more than one writeStream in a code, which operates on the same
readStream data, why does it produce only the first writeStream? I want the
second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from transformed_Stream_DF")


# ---#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# ---#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages 
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.


Fwd: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-06 Thread Aakash Basu
Any help?

Need urgent help. Someone please clarify the doubt?


-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Mon, Apr 2, 2018 at 1:01 PM
Subject: [Structured Streaming Query] Calculate Running Avg from Kafka feed
using SQL query
To: user <user@spark.apache.org>, "Bowden, Chris" <
chris.bow...@microfocus.com>


Hi,

This is a very interesting requirement, where I am getting stuck at a few
places.

*Requirement* -

Col1Col2
1  10
2  11
3  12
4  13
5  14



*I have to calculate avg of col1 and then divide each row of col2 by that
avg. And, the Avg should be updated with every new data being fed through
Kafka into Spark Streaming.*

*Avg(Col1) = Running Avg*
*Col2 = Col2/Avg(Col1)*


*Queries* *-*


*1) I am currently trying to simply run a inner query inside a query and
print Avg with other Col value and then later do the calculation. But,
getting error.*

Query -

select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF)
as myAvg from transformed_Stream_DF t

Error -

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources
must be executed with writeStream.start();

Even though, I already have writeStream.start(); in my code, it is probably
throwing the error because of the inner select query (I think Spark is
assuming it as another query altogether which require its own
writeStream.start. Any help?


*2) How to go about it? *I have another point in mind, i.e, querying the
table to get the avg and store it in a variable. In the second query simply
pass the variable and divide the second column to produce appropriate
result. But, is it the right approach?

*3) Final question*: How to do the calculation over the entire data and not
the latest, do I need to keep appending somewhere and repeatedly use it? My
average and all the rows of the Col2 shall change with every new incoming
data.


*Code -*

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
aggregate_func = spark.sql(
"select t.Col2 , (Select AVG(Col1) as Avg from
transformed_Stream_DF) as myAvg from transformed_Stream_DF t")  #
(Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.


Fwd: [Structured Streaming] How to save entire column aggregation to a file

2018-04-06 Thread Aakash Basu
Any help?

Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Thu, Apr 5, 2018 at 2:28 PM
Subject: [Structured Streaming] How to save entire column aggregation to a
file
To: user <user@spark.apache.org>


Hi,

I want to save an aggregate to a file without using any window, watermark
or groupBy. So, my aggregation is at entire column level.

df = spark.sql("select avg(col1) as aver from ds")


Now, the challenge is as follows -

1) If I use outputMode = Append, but "*Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark*"

query2 = df \
.writeStream \
.format("parquet") \
.option("path", "/home/aakashbasu/Downloads/Kafka_Testing/Temp_AvgStore/") \
.option("checkpointLocation", "/home/aakashbasu/Downloads/Kafka_Testing/") \
.trigger(processingTime='3 seconds') \
.start()



2) If I use outputMode = Complete, but "*Data source parquet does not
support Complete output mode;*"

query2 = df \
.writeStream \
.outputMode("complete") \
.format("parquet") \
.option("path", "/home/aakashbasu/Downloads/Kafka_Testing/Temp_AvgStore/") \
.option("checkpointLocation", "/home/aakashbasu/Downloads/Kafka_Testing/") \
.trigger(processingTime='3 seconds') \
.start()


What to do? How to go about it?

Thanks,
Aakash.


Fwd: Spark Structured Streaming Inner Queries fails

2018-04-06 Thread Aakash Basu
Any help?

Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Thu, Apr 5, 2018 at 2:50 PM
Subject: Spark Structured Streaming Inner Queries fails
To: user <user@spark.apache.org>


Hi,

Why are inner queries not allowed in Spark Streaming? Spark assumes the
inner query to be a separate stream altogether and expects it to be
triggered with a separate writeStream.start().

Why so?

Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming
sources must be executed with writeStream.start();;\ntextSocket\n===
Streaming Query ===\nIdentifier: [id = f77611ee-ce1c-4b16-8812-0f1afe05562c,
runId = 0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port:
9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical
Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8
[]) AS col3#9]\n:  +- Aggregate [avg(cast(col1#3 as double)) AS
aver#7]\n: +- SubqueryAlias ds\n:+- Project [split(value#1,
,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n:   +-
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3ac605bf,
socket,List(),None,List(),None,Map(header -> true, host -> localhost, path
-> csv, port -> 9998),None), textSocket, [value#1]\n+- SubqueryAlias ds\n
+- Project [split(value#1, ,)[0] AS col1#3, split(value#1, ,)[1] AS
col2#4]\n  +- StreamingExecutionRelation TextSocketSource[host:
localhost, port: 9998], [value#1]\n'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
.builder \
.appName("StructuredRunningAvg") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")

id = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))


id.createOrReplaceTempView("ds")

final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as
aver from ds) col3 from ds")

query = final_DF \
.writeStream \
.format("console") \
.start()

query.awaitTermination()



Thanks,
Aakash.


[Structured Streaming] More than 1 streaming in a code

2018-04-05 Thread Aakash Basu
Hi,

If I have more than one writeStream in a code, which operates on the same
readStream data, why does it produce only the first writeStream? I want the
second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from transformed_Stream_DF")


# ---#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# ---#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages 
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.


Spark Structured Streaming Inner Queries fails

2018-04-05 Thread Aakash Basu
Hi,

Why are inner queries not allowed in Spark Streaming? Spark assumes the
inner query to be a separate stream altogether and expects it to be
triggered with a separate writeStream.start().

Why so?

Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming
sources must be executed with writeStream.start();;\ntextSocket\n===
Streaming Query ===\nIdentifier: [id =
f77611ee-ce1c-4b16-8812-0f1afe05562c, runId =
0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port:
9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical
Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8
[]) AS col3#9]\n:  +- Aggregate [avg(cast(col1#3 as double)) AS
aver#7]\n: +- SubqueryAlias ds\n:+- Project [split(value#1,
,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n:   +-
StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@3ac605bf,socket,List(),None,List(),None,Map(header
-> true, host -> localhost, path -> csv, port -> 9998),None), textSocket,
[value#1]\n+- SubqueryAlias ds\n   +- Project [split(value#1, ,)[0] AS
col1#3, split(value#1, ,)[1] AS col2#4]\n  +-
StreamingExecutionRelation TextSocketSource[host: localhost, port: 9998],
[value#1]\n'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
.builder \
.appName("StructuredRunningAvg") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")

id = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))


id.createOrReplaceTempView("ds")

final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as
aver from ds) col3 from ds")

query = final_DF \
.writeStream \
.format("console") \
.start()

query.awaitTermination()



Thanks,
Aakash.


[Structured Streaming] How to save entire column aggregation to a file

2018-04-05 Thread Aakash Basu
Hi,

I want to save an aggregate to a file without using any window, watermark
or groupBy. So, my aggregation is at entire column level.

df = spark.sql("select avg(col1) as aver from ds")


Now, the challenge is as follows -

1) If I use outputMode = Append, but "*Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark*"

query2 = df \
.writeStream \
.format("parquet") \
.option("path", "/home/aakashbasu/Downloads/Kafka_Testing/Temp_AvgStore/") \
.option("checkpointLocation", "/home/aakashbasu/Downloads/Kafka_Testing/") \
.trigger(processingTime='3 seconds') \
.start()



2) If I use outputMode = Complete, but "*Data source parquet does not
support Complete output mode;*"

query2 = df \
.writeStream \
.outputMode("complete") \
.format("parquet") \
.option("path", "/home/aakashbasu/Downloads/Kafka_Testing/Temp_AvgStore/") \
.option("checkpointLocation", "/home/aakashbasu/Downloads/Kafka_Testing/") \
.trigger(processingTime='3 seconds') \
.start()


What to do? How to go about it?

Thanks,
Aakash.


Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Hi all,

The following is the updated code, where I'm getting the avg in a DF, but
the collect() function, to store the value as a variable and pass it to the
final select query is not working. So, avg is currently a dataframe and not
a variable with value stored in it.

New code -

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
avg = spark.sql("select AVG(Col1) as Avg from
transformed_Stream_DF")  # .collect()[0][0]

aggregate_func = spark.sql(
"select Col1, Col2, Col2/{0} as Col3 from
transformed_Stream_DF".format(avg))  # (Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---
query1 = avg \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query1.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py


If I uncomment the collect from the above code and use it, I get the
following error -

*pyspark.sql.utils.AnalysisException: u'Queries with streaming sources must
be executed with writeStream.start();;\nkafka'*

Any alternative (better) solution to get this job done, would suffice too.

Any help shall be greatly acknowledged.

Thanks,
Aakash.

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
>

Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Any help, guys?

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1Col2
> 1  10
> 2  11
> 3  12
> 4  13
> 5  14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg 
> from transformed_Stream_DF t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
> aggregate_func = spark.sql(
> "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) 
> as myAvg from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
> # ---For Console Print---
>
> query = aggregate_func \
> .writeStream \
> .format("console") \
> .start()
> # .outputMode("complete") \
> # ---Console Print ends---
>
> query.awaitTermination()
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
>
> Thanks,
> Aakash.
>


[Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query

2018-04-02 Thread Aakash Basu
Hi,

This is a very interesting requirement, where I am getting stuck at a few
places.

*Requirement* -

Col1Col2
1  10
2  11
3  12
4  13
5  14



*I have to calculate avg of col1 and then divide each row of col2 by that
avg. And, the Avg should be updated with every new data being fed through
Kafka into Spark Streaming.*

*Avg(Col1) = Running Avg*
*Col2 = Col2/Avg(Col1)*


*Queries* *-*


*1) I am currently trying to simply run a inner query inside a query and
print Avg with other Col value and then later do the calculation. But,
getting error.*

Query -

select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF)
as myAvg from transformed_Stream_DF t

Error -

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources
must be executed with writeStream.start();

Even though, I already have writeStream.start(); in my code, it is probably
throwing the error because of the inner select query (I think Spark is
assuming it as another query altogether which require its own
writeStream.start. Any help?


*2) How to go about it? *I have another point in mind, i.e, querying the
table to get the avg and store it in a variable. In the second query simply
pass the variable and divide the second column to produce appropriate
result. But, is it the right approach?

*3) Final question*: How to do the calculation over the entire data and not
the latest, do I need to keep appending somewhere and repeatedly use it? My
average and all the rows of the Col2 shall change with every new incoming
data.


*Code -*

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")
aggregate_func = spark.sql(
"select t.Col2 , (Select AVG(Col1) as Avg from
transformed_Stream_DF) as myAvg from transformed_Stream_DF t")  #
(Col2/(AVG(Col1)) as Col3)")

# ---For Console Print---

query = aggregate_func \
.writeStream \
.format("console") \
.start()
# .outputMode("complete") \
# ---Console Print ends---

query.awaitTermination()
# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.


[Query] Columnar transformation without Structured Streaming

2018-03-29 Thread Aakash Basu
Hi,

I started my Spark Streaming journey from Structured Streaming using Spark
2.3, where I can easily do Spark SQL transformations on streaming data.

But, I want to know, how can I do columnar transformation (like, running
aggregation or casting, et al) using the prior utility of DStreams? Is
there a way? Do I have to use map on RDD and go about the complex
transformative steps? Or can I convert a DStream into DF and do the job?

Appreciations in advance!

Thanks,
Aakash.


Structured Streaming Spark 2.3 Query

2018-03-22 Thread Aakash Basu
Hi,

What is the way to stop a Spark Streaming job if there is no data inflow
for an arbitrary amount of time (eg: 2 mins)?

Thanks,
Aakash.


  1   2   >